jueves, 11 de marzo de 2021

Idempotencia de producer y consumer en Kafka (exactly-once semantics)

 Kafka trabaja muy bien procesando colas de eventos de forma asíncrona, y es muy eficiente gracias al uso de clusters y brokers, por lo que la alta disponibilidad está (casi) asegurada.

Sin embargo, existen algunas casuísticas por la cuales el flujo de mensajes podría fallar, y eso es imperdonable en un sistema crítico de alto estrés, en el que cada mensaje cuenta. 

Nota: La primera parte del artículo expone los problemas y el planteamiento desde el punto de vista teórico. La segunda parte se abordará un ejemplo usando Java y la librería kafka-clients


Problemas

El orden de los mensajes

Cuando se envían multitud de mensajes, Kafka va a optimizar la eficiencia de tiempos y el rendimiento. Para ello, balancea la carga de las particiones, enviando cada mensaje a una partición que tenga menos carga de trabajo. Cada partición se ejecuta de forma asíncrona y en paralelo, por lo que el orden natural de los mensajes se diluye, y unos mensajes se ejecutan antes o después que otros, desde el punto de vista del orden.

En un escenario en que el orden en que se procesen los mensajes no sea relevante, esta es una solución óptima. En un flujo de logs se podría incluir el timestamp de origen para poder ordenar posteriormente la secuencia de estos logs.

Pero cuando trabajamos en una arquitectura EDA (Event Driven Architecture), el orden de la secuencia de mensajes es primordial, de cara a mantener correctamente el Event-Sourcing, y registrar correctamente la realidad. Este orden es clave para poder mantener y regenerar el estado de los agregados y de las entidades en caso de pérdida.

Para solucionar este problema, hemos de especificar, de forma explícita, una clave (key o ID) a los mensajes. Esto afecta a los mensajes referidos a una entidad en concreto. Por ejemplo, si enviamos mensajes sobre operaciones bancarias, la clave podría ser el número de cuenta bancaria o/y el número de tarjeta de crédito. Con esta clave, Kafka dirige todos los mensajes con la misma clave, siempre a la misma partición, con lo que la secuencia, en principio se mantendría en orden.

Pero puede ocurrir que, si hay una gran carga de trabajo en la red u ocurre algún problema transitorio, un mensaje no se pueda guardar en ese momento por un timeout. Para no perderlo, la propiedad "retries" permite reintentar especificar el número de reintentos para guardar un mensaje fallido más adelante. Por defecto, Kafka lo establece en un valor de 2147483647, por lo que no hay problema si no lo configuramos.

Pero, por otra parte, Kafka establece un número máximo de peticiones al mismo tiempo, por lo que, si uno de los mensajes falla, aunque luego lo reintente, las otras peticiones procesarán los siguientes mensajes. Cuando en el siguiente reintento se guarde el mensaje que falló, al hacerlo lo hará después de n siguientes mensajes, con lo que el orden se rompe.

Para solucionar esto podemos restringir el número de peticiones concurrentes mediante la propiedad "max.in.flight.requests.per.connection". Por defecto, Kafka establece este valor a 5 conexiones, por lo que, para asegurar el orden en caso de un fallo puntual en algún mensaje, estableceremos este valor a 1. 

Lo anterior asegurará el orden de los mensajes, pero la eficacia tiene el coste del rendimiento, el cual disminuye dependiendo del escenario y de la carga. No obstante, esta bajada de rendimiento no es crítica ni demasiado relevante.


La idempotencia

La idempotencia se refiere a la característica de que un mensaje siempre se entrega, y éste se entrega exactamente una vez. Es decir, la idempotencia garantiza la consistencia de datos sin duplicidad.

Suena obvio, pero en un sistema distribuido esta característica es la más compleja de cumplir. Vamos a tratar de explicarlo en detalle y con claridad.


¿Cómo asegura Kafka la entrega de un mensaje?

El flujo de entrega de un mensaje en Kafka, por parte de un producer (productor o emisor de mensajes), se resume en 3 pasos:

  • El producer envía el mensaje al broker de Kafka
  • El broker guarda el mensaje en el siguiente offset en la partición correspondiente
  • El broker retorna un ack (acknwoledgement o reconocimiento) con valor 1, confirmando al producer de que el mensaje se entregó correctamente.  

 Este flujo se representa fácilmente en el siguiente esquema:

Conociendo este flujo, surgen cuatro casuísticas o semánticas.


Semántica "no-guarantee" (sin garantía)

El mensaje se puede procesar una vez, varias veces o ninguna. Este escenario no viene establecido por defecto en Kafka, pues no es habitual. 


Semántica "at-least-once"  (al menos una vez)

En este escenario, si se produce algún problema en la entrega del mensaje, el productor realizará reintentos hasta conseguir que la entrega esté confirmada. 

Aparentemente, esta solución suena bien y puede ser correcta para la mayoría de aplicaciones. Pero existe una casuística que puede generar un problema para conseguir la idempotencia

Si el broker guarda correctamente el mensaje en el offset, pero en el momento de retornar el ack al producer se produce alguna incidencia transitoria, el producer no tendrá la confirmación de entrega del mensaje, aunque en realidad el broker lo haya guardado. El producer reintentará enviar el mensaje nuevamente tantas veces como sea necesario hasta que obtenga el ack de confirmación.

Este escenario asegura que, al menos una vez, el mensaje haya sido enviado con éxito, pero también puede generar mensajes duplicados en el offset de la partición.

El siguiente diagrama ilustra esta casuística:


Semántica "at-most-once" (como máximo una vez)

El offset es comprometido (committed) tan pronto como el lote de mensajes llegue al broker. Si bien este escenario evita las duplicidades, si algo fue mal en el envío, el mensaje se perderá y no podrá ser leído. 

Suele usarse para un enfoque fire-and-forget (dispara y olvídate), es decir, se envía el mensaje a Kafka sin reintentos, ignorando cualquier respuesta desde el broker.

Es recomendable guardar primero el progreso y los datos antes de enviar el mensaje a Kafka


Semántica "exactly-once" (exactamente una vez)

En este escenario, es necesario asegurar que el mensaje sea enviado a Kafka sin pérdida ni duplicidades. Es recomendable que el envío de mensajes se realice dentro de una transacción, a fin de asegurar la llegada de los mensajes, su secuencia y la unicidad de los mensajes, para que el consumidor de Kafka pueda operar correctamente con los offsets



La idempotencia en el producer

¿Cómo conseguir la idempotencia en el producer?

Para que se cumpla la idempotencia en el producer, será tan simple como configurar las siguientes propiedades:

  • enable.idempotence=true
  • acks=all
  • retries>0
  • max.in.flight.requests.per.connection<=5

Con la configuración anterior, ya tenemos nuestro producer listo para trabajar bajo la semántica de la idempotencia.

Pero si queremos que nuestro consumer también trabaje de forma idempotente, debemos ayudarle desde el producer

Tal y como lo hemos dejado ahora, el consumer debería añadir un esfuerzo extra en trabajar en el control y gestión de los offsets de forma manual para conseguir la idempotencia.


Para evitar este trabajo, desde el producer vamos realizar toda la operativa de envíos atómicos de mensajes dentro de una transacción. Para que esta transacción atómica sea más efectiva, definiremos la siguiente propiedad:

  • transactional.id=<id_transaccion>

Esta propiedad define una parte de la clave que utilizará el algoritmo de transacción para genera el id de la transacción junto a la secuencia.

La transacción asegurará que todos los mensajes de la transacción sean enviados correctamente, definiendo la secuencia y el offset preparado para el consumidor. 

De forma similar a una base de datos, una transacción prepara y ejecuta múltiples operaciones de escritura. En el caso de que ocurra algún problema, no se generan cambios en el offset y se informa del problema al producer, para que pueda decidir si volver a intentar la transacción o realizar alguna otra operación. Por tanto, si todo es correcto, la entrega los mensajes está asegurada sin incoherencias. Además, por parte del consumer podemos especificar que lea del offset solamente los mensajes una vez finalizada (y asegurada) la transacción, evitando leer mensajes aún en tránsito.



Planteamiento del ejemplo

Para el ejemplo, usaremos Java y la librería kafka-clients. Para configurar el pom.xml de Maven, añadiremos las siguientes dependencias:

<dependencies>

   <dependency>

      <groupId><org.apache.kafka/groupId>

      <artifactId>kafka-clients</artifactId>

      <version>2.3.0</version>

   </dependency>


   <dependency>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-simple</artifactId>

      <version>1.6.1</version>

   </dependency>

</dependencies>


En este ejemplo vamos a simular el envío masivo de eventos (mensajes) de varios sensores de temperatura de un motor crítico (por ejemplo, el de un avión o el de un coche de Fórmula 1). Esta simulación enviará mil eventos, eligiendo aleatoriamente un determinado sensor (de entre 10 posibles), con una temperatura aleatoria entre 100 y 300 grados centígrados.


Código del producer

El siguiente código pone en práctica los conceptos vistos anteriormente. 

Nota: Antes de ejecutarlo, hemos de ejecutar primero el consumer, para que éste escuche los mensajes que genere el producer.


import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

import java.util.Random;

import java.util.TimeZone;


import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


/**

 * IdempotentProducer example

 * @author: Rafael Hernamperez

 *

 */

public class IdempotentProducer {

   private static final int numberOfSensors = 10;

   private static final int numberOfEvents = 1000;

   private static final String eventTopic = "temperature-changed";

   private static Logger log = LoggerFactory.getLogger(IdempotentProducer.class) {


   public static void main(String[] args) {

      Random rand = new Random();


      // Producer properties

      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");

      props.put("acks", "all");

      props.put("linger.ms", "10");

      props.put("retries", "50");

      props.put("max.in.flight.requests.per.connection", "1");

      props.put("enable.idempotence", "true");

      props.put("transacional.id", "temp-changed");

      props.put("key.serializer", "org.apache.kafka.common.serialization-StringSerializer");

      props.put("value.serializer", "org.apache.kafka.common.serializztion-StringSerializer");


      try (Producer<Sring, String> producer = new KafkaProducer<>(props)) {

         try {

            // Start transaction

            producer.initTransactions();

            producer.beginTransaction();


            // Generate messages

            for (int event = 0; event < numberOfEvents; event++) {

               int sensor = rand.nextInt(numberOfSensors);


               // Temperature simulation (from 100º to 300º)

               int intTemp = 100 + rand.nextInt(200);

               double decTemp = rand.nextDouble() + intTemp;


               // Current timestamp in ISO 8601 format

               Date date = new Date(System.currentTimeMillis());

               SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");

               sdf.setTimeZone(TimeZone.getTimeZone("CET"));


               // Compose message in CSV format: datetime;sensordId;temperature

               String eventValue = String.format("%s;%d;%.1f", sdf.format(date), sensor, decTemp);


               // Compose the key using the sensorId

               String keyEvent = String.format("tempSensor-%s", sensor);

               

               // Send the event (message)

               producer.send(new ProducerRecord<String, String>(eventTopic, keyEvent, valueEvent));

               logInfo("Event #" + event + " sent: " + valueEvent);

            }  // for

            

            // Commit transaction

            producer.commitTransaction();

         } catch (Exception e) {

            log.error("Error in transaction: " + e.getMessage());


            // Abort Transaction

            producer.abortTransaction();

         }

      } catch (Exception e) {

         log.error("Error sending event: " + e.getMessage());

      }

   }  // main()

}  // class


La idempotencia en el consumer

¿Cómo conseguir la idempotencia en el consumer?

Una vez resuelta la idempotencia en el producer, tal y como hemos desarrollado anteriormente, la parte de la idempotencia en el lado del consumer es mucho más sencilla.

Para ello, vamos a establecer las siguientes propiedades clave:

  • isolation.level="read_committed"

El nivel de aislamiento establecido como "read_committed" indica que el consumer solamente aborde los mensajes cuyo offset esté comprometido, es decir, que tenga la marca committed. Esto solamente ocurrirá cuando la transacción haya terminado exitosamente, por lo que solo aplica a los mensajes de la transacción.

Además, la transacción habrá generado la secuencia correcta de los mensajes para que el consumer los procese en el orden correspondiente.


Planteamiento del código

Para el lado del consumer, vamos a generar una aplicación multihilo que lance en paralelo varios consumers que estén escuchando al mismo tiempo y procesen los mensajes.

Para ello, habrá dos archivos de código:

  • ThreadConsumer.java: Contiene el código de cada hilo consumer.
  • IdempotentConsumer.java: Aplicación principal. Prepara el entorno multihilo, instancia cada hilo y lanza los consumers.


Código del hilo para consumer

El siguiente código corresponde a cada hilo de consumer. En un entorno multihilo, habrá varios procesos ejecutándose al mismo tiempo. Este es el código de cada uno de estos hilos.


import java.time.Duration;

import java.util.Arrays;


import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.commons.errors.WakeupException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


/**

 * ThreadConsumer 

 * @author: Rafael Hernamperez

 *

 */

public class ThreadConsumer extends Thread {

   private final KafkaConsumer<String, String> consumer;

   private final String eventTopic = "temperature-changed";

   private boolean closed = false;

   private Logger log = LoggerFactory.getLogger(ThreadConsumer.class);


   // Constructor

   public ThreadConsumer(KafkaConsumer<String, String> consumer) {

      this.consumer = consumer;

   }


   // Main thread execution

   @Override

   public void run() {

      consumer.subscribe(Arrays.asList(eventTopic));


      try {

         while(!closed) {

            ConsumerRecords<String, String> events = consumer.poll(Duration.ofMillis(100));


            for (ConsumerRecord<String, String> event : events) {

               log.info("Partition={}, Offset={}, key={}, value={}",

                  event.partition(),

                  event.offset(),

                  event.key(),

                  event.value());

            }  // for

         }  // while

      } catch(WakeupException Exception we) {

         if (!closed) {

            throw we;

         }

      } finally {

         consumer.close();

      }

   }  // run()

} // class


Código principal para el consumer

El código principal del consumer se encargará de preparar un entorno multihilo, en el que varios consumers trabajarán al mismo tiempo para escuchar y leer los mensajes que genere el producer. Al crear un hilo, se le pasará un objeto de tipo KafkaConsumer, inicializado con las propiedades de ese consumer, las cuales ya están preparadas para su funcionamiento con una semántica exactly-once, lo que facilitará la idempotencia.


import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.KafkaConsumer;


/**

 * IdempotentConsumer example

 * @author: Rafael Hernamperez

 *

 */

public class IdempotentProducer {

   public static final int numberOfConsumers = 5;


   public static void main(String[] args) {

      // Consumer properties

      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");

      props.put("group.id", "temp-event-group");

      props.put("enable.auto.commit", "false");

      props.put("isolation.level", "read_committed");

      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


      ExecutorService executyor = Executors.newFixedThreadPool(numberOfConsumers);


      // Generate consumers in a multithread environment

      for (int i=0; i<numberOfConsumer; i++) {

         ThreadConsumer consumer = new ThreadConsumer(new KafkaConsumer<>(props));

         executor.execute(consumer);

      }


      // Execution will alive until last thread is terminated

      while(!executor.isTerminated());


   }  // main()

}  // class



Configuraciones clave

A continuación se resumen las propiedades y valores clave para cada caso.


Consumer

  • No-guarantee:
    • enable.auto.commit = true
    • auto.commit.interval.ms = <frecuencia_milisegundos>
  • At-least-once:
    • enable.auto.commit = false
    • O enable.auto.commit=true con auto.commit.interval.ms con valor muy alto.
    • Recomendado utilizar consumer.commitSync() para controlar los commits en los offset de los mensajes.
  • At-most-once:
    • enable.auto.commit = true
    • auto.commit.interval.ms = <frecuencia_milisegundos>
    • No usar el método consumer.commitSync()
  • Exactly-once:
    • enable.auto.commit = false
    • isolation.level = "read_committed"

Producer

  • retries > 0
  • acks = "all"
  • max.in.flight.requests.per.connection <= 5
  • enable.idempotence = true
  • transacional.id = <clave_unica_transaccion>

Para facilitar la idempotencia real tanto en producer como en consumer, realizar el envío de mensajes dentro de una transacción.


Enlaces de referencia






















lunes, 8 de marzo de 2021

Simple Event Processing (Procesamiento Simple de Eventos) y Kafka

 En una Arquitectura Orientada a Eventos (EDA ó Event Driven Architecture) existen varios enfoques de eventos, dependiendo del tipo de procesamiento.

En este post analizaremos el tipo de procesamiento más simple: SEP (Simple Event Processing)


¿Qué es un Procesamiento Simple de Eventos?

En SEP, cuando se produce un evento, se enruta inmediatamente para realizar una acción posterior, sin aplicar restricciones ni consideraciones. El flujo de trabajo se ejecuta en tiempo real, reduciendo así la latencia entre el momento en que se produce el evento y la acción resultante.

Un escenario habitual de un patrón SEP es en escenarios donde se producen cambios medibles, tales como un cambio de temperatura, de presión, de humedad, de CO2, etc, en un sensor.

Pero no debemos limitar un SEP a un ambiente IoT o de sensores, pues también se puede aplicar a diferentes entornos. Por ejemplo, un evento simple se podría producir en un complejo sistema, en donde, al realizar un pedido, cuando se accede al inventario para realizar la reserva del pedido, si supera cierto umbral, se puede lanzar un evento de inventario bajo para el producto que se ha adquirido. De esta forma, dicho evento se propaga como alerta o/y a un buzón para que el responsable de compras adquiera una nueva remesa de productos.

La clave para entender un SEP, es que procesa un único evento a la vez, sin tener en cuenta otros eventos.  En este aspecto, SEP funciona como un "flujo de trabajo humano" o "una cosa a la vez".

Este tipo de procesamiento se ha tratado típicamente mediante un servicio de mensajería, como AMQ, RabbitMQ o JMS (Java Messaging Service). Los servicios de mensajería publican y consumen una única vez el mensaje, es decir, que cuando el mensaje es leído y procesado por un consumidor, el mensaje desaparece.

Con Apache Kafka o con Apache Pulsar elevan el concepto de mensajería a otro nivel, utilizando un broker en lugar de una cola de mensajes. Esto significa que un mensaje puede estar publicado durante un tiempo largo y ser consumido por uno o varios consumidores, sin necesidad de eliminarlo del pool de mensajes.




domingo, 7 de marzo de 2021

Las 21 mejores tipografías para programar

Cuando programas, ¿te aburre la tipografía por defecto de tu IDE o de tu editor de textos?

A continuación comparto una selección de 21 tipografías que te sacarán de la monotonía y te harán disfrutar de la programación.

Esta selección es particular, no un ranking universalmente aceptado. Te invito a dejar un comentario con tu opinión y a aportar otras tipografías que te gusten para esta selección.


CP Mono




Cutive Mono






Exo






Exo 2





Fantasque Sans Mono




JetBrains Mono




Jura



Lekton





Menlo


Monaco



Monolisa




Monospace Typewriter




Overpass Mono



PT Mono


Roboto Mono




SaxMono



SF Mono




Skyhook Mono




Tipografías extra


La lista anterior se quedó pequeña, por lo que se añaden nuevas tipografías.

Anonymous Pro




Courier Prime Code


Everson Mono





Space Mono




Enlaces de interés



lunes, 15 de febrero de 2021

Cómo crear un productor de streaming de eventos para Kafka en Java



Aviso: Este pequeño tutorial no va a realizar una introducción a Kafka. Se asume que el lector ya tiene unas nociones sobre Kafka y quiere comenzar a desarrollar código en Java para enviar eventos a un topic de Kafka. Esto es habitual realizarlo, principalmente, desde microservicios o desde scripts batch.

Nota: El concepto de estado utilizado en este artículo está contextualizado en una EDA (Arquitectura Orientada a Eventos), y representa un estado originado por un evento. Dentro de Kafka se asume este concepto como record o registro, y se refiere al valor que almacenará Kafka en el bus de eventos.

Requisitos

Se asume que ya se dispone de un entorno de Kafka funcionando, y que en dicho entorno existe, al menos, un topic sobre el cual escribir eventos. Dicho entorno puede estar en la propia máquina de desarrollo (localhost), en un servidor dedicado o en un servidor en cloud.

Más adelante (en otro artículo), veremos cómo desarrollar código para suscribirnos a ese topic y poder responder, en consecuencia, a dichos eventos. Ese código corresponderá a la parte del consumidor o suscriptor. En este artículo nos centraremos exclusivamente en la parte del productor o publicador.

En el entorno de desarrollo se recomienda tener lo siguiente:
  • Java JDK versión 11 (he utilizado la versión 15)
  • Gestión de dependencias con Maven
  • El IDE que utilizado ha sido el de Spring Tools, pero con Eclipse o IntelliJ IDEA no debería haber problemas.
  • Dependencias de Kafka en Maven versión 2.7.0

Configuración de Maven


En el archivo pom.xml del proyecto Java, añadir la dependencia encargada de importar las librerías necesarias para trabajar con streams de eventos en Kafka:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.7.0</version>
    </dependency>
</dependencies>

Configuración de las propiedades

El primer paso a realizar en el código, será definir las propiedades para poder configurar la conexión a Kafka. Las más importantes son las siguientes:

// Propiedades del producer
Properties props = new Properties();

// Lista de servidores Kafka a los que conectarse
props.put("bootstrap.servers", "localhost:9092");

// Serializacion de los datos de la clave (key)
props.put("key.serializer", StringSerializer.class.getName());

// Serializacion de los datos del valor (value)
props.put("value.serializer", StringSerializer.class.getName());

Para el ejemplo, usaremos el tipo String, que es el que viene por defecto. Kafka permite definir estructuras de datos mediante JSON y Avro (este es el preferido), que se definen con el objeto Serdes (SERialize/DESerialize).

A continuación se exponen algunas propiedades no tan relevantes ahora (son opcionales), pero que se podrán usar en un futuro para tunear la configuración de la conexión a Kafka:

props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);


Objeto KafkaProducer

El objeto KafkaProducer permite crear un cliente para una conexión a un topic de Kafka, a partir de la información proporcionada en las propiedades descritas anteriormente.

KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);

Este objeto permite definir el tipo de datos de la clave (primer parámetro) y del valor o estado (segundo parámetro). En nuestro ejemplo utilizaremos el tipo String (si se desea trabajar con tipos customizados, ver los tipos Serdes, y cómo definir tipos en JSON o Avro).

Nota: Este objeto crea un cliente genérico a un servidor de Kafka, por lo que se puede utilizar posteriormente para enviar estados a streams de eventos a diferentes topics.

Preparación del estado (registro) a enviar

Para enviar un estado o registro a Kafka desde el productor, es necesario preparar éste mediante un objeto de tipo ProducerRecord:

ProducerRecord<String, String> pr = new Producer<String, String>(topic, key, estado);

Este objeto permite definir el tipo de datos de la clave (primer parámetro) y del valor o estado (segundo parámetro). En nuestro ejemplo utilizaremos el tipo String (si se desea trabajar con tipos customizados, ver los tipos Serdes, y cómo definir tipos en JSON o Avro).

En la construcción (entre paréntesis) se pasarán los valores correspondientes a:
  • topic: Valor del nombre del topic a usar en Kafka, donde se enviará el evento.
  • key: Valor de la clave (key) del evento o registro.
  • estado: Valor del estado o registro a enviar.
El valor de la clave puede ser opcional en otros contextos de Kafka (se almacenaría como null). Por ello, también permitiría la siguiente sintaxis:

ProducerRecord<String, String> pr = new Producer<String, String>(topic, estado);


Envío del estado a Kafka


Una vez preparado el estado, éste se envía a Kafka a través del objeto KafkaProducer definido al principio, pasándole el objeto ProducerRecord con la información del estado:

kp.send(pr);

El método send() envía el estado o registro al topic especificado.


Cerrar la conexión a Kafka


Cuando nuestro código no va a enviar más estados a Kafka, debemos cerrar el objeto KafkaProducer, para liberar recursos del stream, así como la conexión. Para ello, utilizaremos el método close():

kp.close();


Aplicación de ejemplo de productor Kafka


A continuación os dejo una aplicación completa que hace de productor Kafka.

En este ejemplo, se ejecuta desde la consola de comandos como un script, pero la base os servirá también para microservicios u otro tipo de aplicaciones.

Lo primero que hará será preguntar por el topic al cual queremos enviar los estados. Después, en un bucle, solicitará el valor del estado a enviar. Dicho estado es un texto libre, por lo que se puede introducir cualquier valor, incluso un JSON en formato String.

Este bucle se repetirá hasta que el usuario introduzca el valor 'quit' (sin comillas). En ese momento se cerrará la conexión y terminará la ejecución.


package com.rhernamperez.kafkastreamsdemo;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
//import org.apache.kafka.streams.StreamsConfig;

/**
* Demostracion de un productor Kafka que envia eventos a un stream
* @author rafinguer
*
*/
public class KafkaProducerDemo {

    public static void main(String[] args) {

        // Propiedades del producer
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        // Creacion del objeto productor
        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
        String estado="", topic="", key = "";

        // Mensaje de bienvenida
        System.out.println("Demostracion de Kafka producer. Introduce datos para cada evento. 'quit' para salir\n");

        // Introduccion del topic por teclado
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        System.out.println("Introduce el nombre del topic: ");

        try {
            topic = br.readLine();
        } catch (Exception e) {
            System.out.println("ERROR > " + e.getMessage());
        }

        // Bucle para introducir estados hasta que se escriba 'quit'
        while(!estado.equals("quit")) {

            try {
                // Lectura de estados por teclado
                System.out.print(">>> ");
                estado = br.readLine();

                if (estado.equals("quit")) continue;

                // Envio del estado. La key sera la fecha y hora actuales
                key = new Date().toString();
                ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, estado);
kp.send(pr);

                System.out.println("Enviado a topic " + topic + " la clave " + key + " con el estado > " + estado + "\n");
            } catch(Exception e) {
                System.out.println("Error > " + e.getMessage());
            }
        }

        // Cerrar el stream
        kp.close();

        System.out.println("**** FIN ****");
    }

}


Enlaces de interés







domingo, 31 de enero de 2021

Introducción a las Arquitecturas Orientadas a Eventos (AOE / EDA)

 


La Arquitectura Orientada a Eventos (AOE / EDA) está tomando más protagonismo en los últimos años, ante la demanda creciente de escenarios de baja latencia, escalado rápido, histórico de cambios, asincronía, alta disponibilidad y resiliencia.

El concepto de AOE/EDA no es algo nuevo, pero con el rebufo de los microservicios, ha sido una solución eficiente al desacoplamiento y al efecto de caída en racimo o en cascada.

En esta presentación aprenderemos los conocimientos básicos para conocer sus conceptos:

  • Conceptos básicos de Arquitectura Orientada a Eventos
    • Paradigma de AOE / EDA
    • Eventos
      • Tips para eventos
      • Protocolos de eventos
      • Streams
    • Ventajas de AOE / EDA
    • Desventajas de AOE / EDA
    • Cuándo usar AOE / EDA
    • Eventos Vs Comandos Vs Consultas
  • Componentes de una Arquitectura Orientada a Eventos
    • Generadores
    • Mensajes
    • Componentes de Mensajería
    • Canales
    • Patrones de Mensajería
    • Procesadores
    • Coordinación mediante un bus de eventos
    • Ejemplo de un sistema complejo
  • Patrones
    • Notificación de eventos
    • Transferencia del estado de los eventos
    • Abastecimiento de eventos (Event Sourcing)
    • CQRS (Command Query Responsability Segregation)
    • Sagas
    • Microservicios orientados a eventos
    • Microservicios en Event Backbone
    • Base de datos por servicio
  • Apache Kafka
  • Amazon EventBridge

La presentación completa se puede ver y descargar en el siguiente enlace: Introducción a Arquitecturas Orientadas a Objetos