lunes, 15 de febrero de 2021

Cómo crear un productor de streaming de eventos para Kafka en Java



Aviso: Este pequeño tutorial no va a realizar una introducción a Kafka. Se asume que el lector ya tiene unas nociones sobre Kafka y quiere comenzar a desarrollar código en Java para enviar eventos a un topic de Kafka. Esto es habitual realizarlo, principalmente, desde microservicios o desde scripts batch.

Nota: El concepto de estado utilizado en este artículo está contextualizado en una EDA (Arquitectura Orientada a Eventos), y representa un estado originado por un evento. Dentro de Kafka se asume este concepto como record o registro, y se refiere al valor que almacenará Kafka en el bus de eventos.

Requisitos

Se asume que ya se dispone de un entorno de Kafka funcionando, y que en dicho entorno existe, al menos, un topic sobre el cual escribir eventos. Dicho entorno puede estar en la propia máquina de desarrollo (localhost), en un servidor dedicado o en un servidor en cloud.

Más adelante (en otro artículo), veremos cómo desarrollar código para suscribirnos a ese topic y poder responder, en consecuencia, a dichos eventos. Ese código corresponderá a la parte del consumidor o suscriptor. En este artículo nos centraremos exclusivamente en la parte del productor o publicador.

En el entorno de desarrollo se recomienda tener lo siguiente:
  • Java JDK versión 11 (he utilizado la versión 15)
  • Gestión de dependencias con Maven
  • El IDE que utilizado ha sido el de Spring Tools, pero con Eclipse o IntelliJ IDEA no debería haber problemas.
  • Dependencias de Kafka en Maven versión 2.7.0

Configuración de Maven


En el archivo pom.xml del proyecto Java, añadir la dependencia encargada de importar las librerías necesarias para trabajar con streams de eventos en Kafka:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.7.0</version>
    </dependency>
</dependencies>

Configuración de las propiedades

El primer paso a realizar en el código, será definir las propiedades para poder configurar la conexión a Kafka. Las más importantes son las siguientes:

// Propiedades del producer
Properties props = new Properties();

// Lista de servidores Kafka a los que conectarse
props.put("bootstrap.servers", "localhost:9092");

// Serializacion de los datos de la clave (key)
props.put("key.serializer", StringSerializer.class.getName());

// Serializacion de los datos del valor (value)
props.put("value.serializer", StringSerializer.class.getName());

Para el ejemplo, usaremos el tipo String, que es el que viene por defecto. Kafka permite definir estructuras de datos mediante JSON y Avro (este es el preferido), que se definen con el objeto Serdes (SERialize/DESerialize).

A continuación se exponen algunas propiedades no tan relevantes ahora (son opcionales), pero que se podrán usar en un futuro para tunear la configuración de la conexión a Kafka:

props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);


Objeto KafkaProducer

El objeto KafkaProducer permite crear un cliente para una conexión a un topic de Kafka, a partir de la información proporcionada en las propiedades descritas anteriormente.

KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);

Este objeto permite definir el tipo de datos de la clave (primer parámetro) y del valor o estado (segundo parámetro). En nuestro ejemplo utilizaremos el tipo String (si se desea trabajar con tipos customizados, ver los tipos Serdes, y cómo definir tipos en JSON o Avro).

Nota: Este objeto crea un cliente genérico a un servidor de Kafka, por lo que se puede utilizar posteriormente para enviar estados a streams de eventos a diferentes topics.

Preparación del estado (registro) a enviar

Para enviar un estado o registro a Kafka desde el productor, es necesario preparar éste mediante un objeto de tipo ProducerRecord:

ProducerRecord<String, String> pr = new Producer<String, String>(topic, key, estado);

Este objeto permite definir el tipo de datos de la clave (primer parámetro) y del valor o estado (segundo parámetro). En nuestro ejemplo utilizaremos el tipo String (si se desea trabajar con tipos customizados, ver los tipos Serdes, y cómo definir tipos en JSON o Avro).

En la construcción (entre paréntesis) se pasarán los valores correspondientes a:
  • topic: Valor del nombre del topic a usar en Kafka, donde se enviará el evento.
  • key: Valor de la clave (key) del evento o registro.
  • estado: Valor del estado o registro a enviar.
El valor de la clave puede ser opcional en otros contextos de Kafka (se almacenaría como null). Por ello, también permitiría la siguiente sintaxis:

ProducerRecord<String, String> pr = new Producer<String, String>(topic, estado);


Envío del estado a Kafka


Una vez preparado el estado, éste se envía a Kafka a través del objeto KafkaProducer definido al principio, pasándole el objeto ProducerRecord con la información del estado:

kp.send(pr);

El método send() envía el estado o registro al topic especificado.


Cerrar la conexión a Kafka


Cuando nuestro código no va a enviar más estados a Kafka, debemos cerrar el objeto KafkaProducer, para liberar recursos del stream, así como la conexión. Para ello, utilizaremos el método close():

kp.close();


Aplicación de ejemplo de productor Kafka


A continuación os dejo una aplicación completa que hace de productor Kafka.

En este ejemplo, se ejecuta desde la consola de comandos como un script, pero la base os servirá también para microservicios u otro tipo de aplicaciones.

Lo primero que hará será preguntar por el topic al cual queremos enviar los estados. Después, en un bucle, solicitará el valor del estado a enviar. Dicho estado es un texto libre, por lo que se puede introducir cualquier valor, incluso un JSON en formato String.

Este bucle se repetirá hasta que el usuario introduzca el valor 'quit' (sin comillas). En ese momento se cerrará la conexión y terminará la ejecución.


package com.rhernamperez.kafkastreamsdemo;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
//import org.apache.kafka.streams.StreamsConfig;

/**
* Demostracion de un productor Kafka que envia eventos a un stream
* @author rafinguer
*
*/
public class KafkaProducerDemo {

    public static void main(String[] args) {

        // Propiedades del producer
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);

        // Creacion del objeto productor
        KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
        String estado="", topic="", key = "";

        // Mensaje de bienvenida
        System.out.println("Demostracion de Kafka producer. Introduce datos para cada evento. 'quit' para salir\n");

        // Introduccion del topic por teclado
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        System.out.println("Introduce el nombre del topic: ");

        try {
            topic = br.readLine();
        } catch (Exception e) {
            System.out.println("ERROR > " + e.getMessage());
        }

        // Bucle para introducir estados hasta que se escriba 'quit'
        while(!estado.equals("quit")) {

            try {
                // Lectura de estados por teclado
                System.out.print(">>> ");
                estado = br.readLine();

                if (estado.equals("quit")) continue;

                // Envio del estado. La key sera la fecha y hora actuales
                key = new Date().toString();
                ProducerRecord<String, String> pr = new ProducerRecord<String, String>(topic, key, estado);
kp.send(pr);

                System.out.println("Enviado a topic " + topic + " la clave " + key + " con el estado > " + estado + "\n");
            } catch(Exception e) {
                System.out.println("Error > " + e.getMessage());
            }
        }

        // Cerrar el stream
        kp.close();

        System.out.println("**** FIN ****");
    }

}


Enlaces de interés