jueves, 11 de marzo de 2021

Idempotencia de producer y consumer en Kafka (exactly-once semantics)

 Kafka trabaja muy bien procesando colas de eventos de forma asíncrona, y es muy eficiente gracias al uso de clusters y brokers, por lo que la alta disponibilidad está (casi) asegurada.

Sin embargo, existen algunas casuísticas por la cuales el flujo de mensajes podría fallar, y eso es imperdonable en un sistema crítico de alto estrés, en el que cada mensaje cuenta. 

Nota: La primera parte del artículo expone los problemas y el planteamiento desde el punto de vista teórico. La segunda parte se abordará un ejemplo usando Java y la librería kafka-clients


Problemas

El orden de los mensajes

Cuando se envían multitud de mensajes, Kafka va a optimizar la eficiencia de tiempos y el rendimiento. Para ello, balancea la carga de las particiones, enviando cada mensaje a una partición que tenga menos carga de trabajo. Cada partición se ejecuta de forma asíncrona y en paralelo, por lo que el orden natural de los mensajes se diluye, y unos mensajes se ejecutan antes o después que otros, desde el punto de vista del orden.

En un escenario en que el orden en que se procesen los mensajes no sea relevante, esta es una solución óptima. En un flujo de logs se podría incluir el timestamp de origen para poder ordenar posteriormente la secuencia de estos logs.

Pero cuando trabajamos en una arquitectura EDA (Event Driven Architecture), el orden de la secuencia de mensajes es primordial, de cara a mantener correctamente el Event-Sourcing, y registrar correctamente la realidad. Este orden es clave para poder mantener y regenerar el estado de los agregados y de las entidades en caso de pérdida.

Para solucionar este problema, hemos de especificar, de forma explícita, una clave (key o ID) a los mensajes. Esto afecta a los mensajes referidos a una entidad en concreto. Por ejemplo, si enviamos mensajes sobre operaciones bancarias, la clave podría ser el número de cuenta bancaria o/y el número de tarjeta de crédito. Con esta clave, Kafka dirige todos los mensajes con la misma clave, siempre a la misma partición, con lo que la secuencia, en principio se mantendría en orden.

Pero puede ocurrir que, si hay una gran carga de trabajo en la red u ocurre algún problema transitorio, un mensaje no se pueda guardar en ese momento por un timeout. Para no perderlo, la propiedad "retries" permite reintentar especificar el número de reintentos para guardar un mensaje fallido más adelante. Por defecto, Kafka lo establece en un valor de 2147483647, por lo que no hay problema si no lo configuramos.

Pero, por otra parte, Kafka establece un número máximo de peticiones al mismo tiempo, por lo que, si uno de los mensajes falla, aunque luego lo reintente, las otras peticiones procesarán los siguientes mensajes. Cuando en el siguiente reintento se guarde el mensaje que falló, al hacerlo lo hará después de n siguientes mensajes, con lo que el orden se rompe.

Para solucionar esto podemos restringir el número de peticiones concurrentes mediante la propiedad "max.in.flight.requests.per.connection". Por defecto, Kafka establece este valor a 5 conexiones, por lo que, para asegurar el orden en caso de un fallo puntual en algún mensaje, estableceremos este valor a 1. 

Lo anterior asegurará el orden de los mensajes, pero la eficacia tiene el coste del rendimiento, el cual disminuye dependiendo del escenario y de la carga. No obstante, esta bajada de rendimiento no es crítica ni demasiado relevante.


La idempotencia

La idempotencia se refiere a la característica de que un mensaje siempre se entrega, y éste se entrega exactamente una vez. Es decir, la idempotencia garantiza la consistencia de datos sin duplicidad.

Suena obvio, pero en un sistema distribuido esta característica es la más compleja de cumplir. Vamos a tratar de explicarlo en detalle y con claridad.


¿Cómo asegura Kafka la entrega de un mensaje?

El flujo de entrega de un mensaje en Kafka, por parte de un producer (productor o emisor de mensajes), se resume en 3 pasos:

  • El producer envía el mensaje al broker de Kafka
  • El broker guarda el mensaje en el siguiente offset en la partición correspondiente
  • El broker retorna un ack (acknwoledgement o reconocimiento) con valor 1, confirmando al producer de que el mensaje se entregó correctamente.  

 Este flujo se representa fácilmente en el siguiente esquema:

Conociendo este flujo, surgen cuatro casuísticas o semánticas.


Semántica "no-guarantee" (sin garantía)

El mensaje se puede procesar una vez, varias veces o ninguna. Este escenario no viene establecido por defecto en Kafka, pues no es habitual. 


Semántica "at-least-once"  (al menos una vez)

En este escenario, si se produce algún problema en la entrega del mensaje, el productor realizará reintentos hasta conseguir que la entrega esté confirmada. 

Aparentemente, esta solución suena bien y puede ser correcta para la mayoría de aplicaciones. Pero existe una casuística que puede generar un problema para conseguir la idempotencia

Si el broker guarda correctamente el mensaje en el offset, pero en el momento de retornar el ack al producer se produce alguna incidencia transitoria, el producer no tendrá la confirmación de entrega del mensaje, aunque en realidad el broker lo haya guardado. El producer reintentará enviar el mensaje nuevamente tantas veces como sea necesario hasta que obtenga el ack de confirmación.

Este escenario asegura que, al menos una vez, el mensaje haya sido enviado con éxito, pero también puede generar mensajes duplicados en el offset de la partición.

El siguiente diagrama ilustra esta casuística:


Semántica "at-most-once" (como máximo una vez)

El offset es comprometido (committed) tan pronto como el lote de mensajes llegue al broker. Si bien este escenario evita las duplicidades, si algo fue mal en el envío, el mensaje se perderá y no podrá ser leído. 

Suele usarse para un enfoque fire-and-forget (dispara y olvídate), es decir, se envía el mensaje a Kafka sin reintentos, ignorando cualquier respuesta desde el broker.

Es recomendable guardar primero el progreso y los datos antes de enviar el mensaje a Kafka


Semántica "exactly-once" (exactamente una vez)

En este escenario, es necesario asegurar que el mensaje sea enviado a Kafka sin pérdida ni duplicidades. Es recomendable que el envío de mensajes se realice dentro de una transacción, a fin de asegurar la llegada de los mensajes, su secuencia y la unicidad de los mensajes, para que el consumidor de Kafka pueda operar correctamente con los offsets



La idempotencia en el producer

¿Cómo conseguir la idempotencia en el producer?

Para que se cumpla la idempotencia en el producer, será tan simple como configurar las siguientes propiedades:

  • enable.idempotence=true
  • acks=all
  • retries>0
  • max.in.flight.requests.per.connection<=5

Con la configuración anterior, ya tenemos nuestro producer listo para trabajar bajo la semántica de la idempotencia.

Pero si queremos que nuestro consumer también trabaje de forma idempotente, debemos ayudarle desde el producer

Tal y como lo hemos dejado ahora, el consumer debería añadir un esfuerzo extra en trabajar en el control y gestión de los offsets de forma manual para conseguir la idempotencia.


Para evitar este trabajo, desde el producer vamos realizar toda la operativa de envíos atómicos de mensajes dentro de una transacción. Para que esta transacción atómica sea más efectiva, definiremos la siguiente propiedad:

  • transactional.id=<id_transaccion>

Esta propiedad define una parte de la clave que utilizará el algoritmo de transacción para genera el id de la transacción junto a la secuencia.

La transacción asegurará que todos los mensajes de la transacción sean enviados correctamente, definiendo la secuencia y el offset preparado para el consumidor. 

De forma similar a una base de datos, una transacción prepara y ejecuta múltiples operaciones de escritura. En el caso de que ocurra algún problema, no se generan cambios en el offset y se informa del problema al producer, para que pueda decidir si volver a intentar la transacción o realizar alguna otra operación. Por tanto, si todo es correcto, la entrega los mensajes está asegurada sin incoherencias. Además, por parte del consumer podemos especificar que lea del offset solamente los mensajes una vez finalizada (y asegurada) la transacción, evitando leer mensajes aún en tránsito.



Planteamiento del ejemplo

Para el ejemplo, usaremos Java y la librería kafka-clients. Para configurar el pom.xml de Maven, añadiremos las siguientes dependencias:

<dependencies>

   <dependency>

      <groupId><org.apache.kafka/groupId>

      <artifactId>kafka-clients</artifactId>

      <version>2.3.0</version>

   </dependency>


   <dependency>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-simple</artifactId>

      <version>1.6.1</version>

   </dependency>

</dependencies>


En este ejemplo vamos a simular el envío masivo de eventos (mensajes) de varios sensores de temperatura de un motor crítico (por ejemplo, el de un avión o el de un coche de Fórmula 1). Esta simulación enviará mil eventos, eligiendo aleatoriamente un determinado sensor (de entre 10 posibles), con una temperatura aleatoria entre 100 y 300 grados centígrados.


Código del producer

El siguiente código pone en práctica los conceptos vistos anteriormente. 

Nota: Antes de ejecutarlo, hemos de ejecutar primero el consumer, para que éste escuche los mensajes que genere el producer.


import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Properties;

import java.util.Random;

import java.util.TimeZone;


import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


/**

 * IdempotentProducer example

 * @author: Rafael Hernamperez

 *

 */

public class IdempotentProducer {

   private static final int numberOfSensors = 10;

   private static final int numberOfEvents = 1000;

   private static final String eventTopic = "temperature-changed";

   private static Logger log = LoggerFactory.getLogger(IdempotentProducer.class) {


   public static void main(String[] args) {

      Random rand = new Random();


      // Producer properties

      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");

      props.put("acks", "all");

      props.put("linger.ms", "10");

      props.put("retries", "50");

      props.put("max.in.flight.requests.per.connection", "1");

      props.put("enable.idempotence", "true");

      props.put("transacional.id", "temp-changed");

      props.put("key.serializer", "org.apache.kafka.common.serialization-StringSerializer");

      props.put("value.serializer", "org.apache.kafka.common.serializztion-StringSerializer");


      try (Producer<Sring, String> producer = new KafkaProducer<>(props)) {

         try {

            // Start transaction

            producer.initTransactions();

            producer.beginTransaction();


            // Generate messages

            for (int event = 0; event < numberOfEvents; event++) {

               int sensor = rand.nextInt(numberOfSensors);


               // Temperature simulation (from 100º to 300º)

               int intTemp = 100 + rand.nextInt(200);

               double decTemp = rand.nextDouble() + intTemp;


               // Current timestamp in ISO 8601 format

               Date date = new Date(System.currentTimeMillis());

               SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");

               sdf.setTimeZone(TimeZone.getTimeZone("CET"));


               // Compose message in CSV format: datetime;sensordId;temperature

               String eventValue = String.format("%s;%d;%.1f", sdf.format(date), sensor, decTemp);


               // Compose the key using the sensorId

               String keyEvent = String.format("tempSensor-%s", sensor);

               

               // Send the event (message)

               producer.send(new ProducerRecord<String, String>(eventTopic, keyEvent, valueEvent));

               logInfo("Event #" + event + " sent: " + valueEvent);

            }  // for

            

            // Commit transaction

            producer.commitTransaction();

         } catch (Exception e) {

            log.error("Error in transaction: " + e.getMessage());


            // Abort Transaction

            producer.abortTransaction();

         }

      } catch (Exception e) {

         log.error("Error sending event: " + e.getMessage());

      }

   }  // main()

}  // class


La idempotencia en el consumer

¿Cómo conseguir la idempotencia en el consumer?

Una vez resuelta la idempotencia en el producer, tal y como hemos desarrollado anteriormente, la parte de la idempotencia en el lado del consumer es mucho más sencilla.

Para ello, vamos a establecer las siguientes propiedades clave:

  • isolation.level="read_committed"

El nivel de aislamiento establecido como "read_committed" indica que el consumer solamente aborde los mensajes cuyo offset esté comprometido, es decir, que tenga la marca committed. Esto solamente ocurrirá cuando la transacción haya terminado exitosamente, por lo que solo aplica a los mensajes de la transacción.

Además, la transacción habrá generado la secuencia correcta de los mensajes para que el consumer los procese en el orden correspondiente.


Planteamiento del código

Para el lado del consumer, vamos a generar una aplicación multihilo que lance en paralelo varios consumers que estén escuchando al mismo tiempo y procesen los mensajes.

Para ello, habrá dos archivos de código:

  • ThreadConsumer.java: Contiene el código de cada hilo consumer.
  • IdempotentConsumer.java: Aplicación principal. Prepara el entorno multihilo, instancia cada hilo y lanza los consumers.


Código del hilo para consumer

El siguiente código corresponde a cada hilo de consumer. En un entorno multihilo, habrá varios procesos ejecutándose al mismo tiempo. Este es el código de cada uno de estos hilos.


import java.time.Duration;

import java.util.Arrays;


import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.commons.errors.WakeupException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;


/**

 * ThreadConsumer 

 * @author: Rafael Hernamperez

 *

 */

public class ThreadConsumer extends Thread {

   private final KafkaConsumer<String, String> consumer;

   private final String eventTopic = "temperature-changed";

   private boolean closed = false;

   private Logger log = LoggerFactory.getLogger(ThreadConsumer.class);


   // Constructor

   public ThreadConsumer(KafkaConsumer<String, String> consumer) {

      this.consumer = consumer;

   }


   // Main thread execution

   @Override

   public void run() {

      consumer.subscribe(Arrays.asList(eventTopic));


      try {

         while(!closed) {

            ConsumerRecords<String, String> events = consumer.poll(Duration.ofMillis(100));


            for (ConsumerRecord<String, String> event : events) {

               log.info("Partition={}, Offset={}, key={}, value={}",

                  event.partition(),

                  event.offset(),

                  event.key(),

                  event.value());

            }  // for

         }  // while

      } catch(WakeupException Exception we) {

         if (!closed) {

            throw we;

         }

      } finally {

         consumer.close();

      }

   }  // run()

} // class


Código principal para el consumer

El código principal del consumer se encargará de preparar un entorno multihilo, en el que varios consumers trabajarán al mismo tiempo para escuchar y leer los mensajes que genere el producer. Al crear un hilo, se le pasará un objeto de tipo KafkaConsumer, inicializado con las propiedades de ese consumer, las cuales ya están preparadas para su funcionamiento con una semántica exactly-once, lo que facilitará la idempotencia.


import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import org.apache.kafka.clients.consumer.KafkaConsumer;


/**

 * IdempotentConsumer example

 * @author: Rafael Hernamperez

 *

 */

public class IdempotentProducer {

   public static final int numberOfConsumers = 5;


   public static void main(String[] args) {

      // Consumer properties

      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");

      props.put("group.id", "temp-event-group");

      props.put("enable.auto.commit", "false");

      props.put("isolation.level", "read_committed");

      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


      ExecutorService executyor = Executors.newFixedThreadPool(numberOfConsumers);


      // Generate consumers in a multithread environment

      for (int i=0; i<numberOfConsumer; i++) {

         ThreadConsumer consumer = new ThreadConsumer(new KafkaConsumer<>(props));

         executor.execute(consumer);

      }


      // Execution will alive until last thread is terminated

      while(!executor.isTerminated());


   }  // main()

}  // class



Configuraciones clave

A continuación se resumen las propiedades y valores clave para cada caso.


Consumer

  • No-guarantee:
    • enable.auto.commit = true
    • auto.commit.interval.ms = <frecuencia_milisegundos>
  • At-least-once:
    • enable.auto.commit = false
    • O enable.auto.commit=true con auto.commit.interval.ms con valor muy alto.
    • Recomendado utilizar consumer.commitSync() para controlar los commits en los offset de los mensajes.
  • At-most-once:
    • enable.auto.commit = true
    • auto.commit.interval.ms = <frecuencia_milisegundos>
    • No usar el método consumer.commitSync()
  • Exactly-once:
    • enable.auto.commit = false
    • isolation.level = "read_committed"

Producer

  • retries > 0
  • acks = "all"
  • max.in.flight.requests.per.connection <= 5
  • enable.idempotence = true
  • transacional.id = <clave_unica_transaccion>

Para facilitar la idempotencia real tanto en producer como en consumer, realizar el envío de mensajes dentro de una transacción.


Enlaces de referencia