Como usar Kafka, Apicurio Registry y Avro con Quarkus

En una arquitectura orientada a eventos (EDA) lo más importante son los datos en sí y más importante aún es gobernar el formato del dato, es decir, usar esquemas.

Los esquemas se usan cuando se lee y escriben los datos. Los esquemas deben ser compartidos entre los servicios que envían el dato y el que los consumen y estos necesitan un recurso centralizado donde gobernar la información sobre el esquema.

Dentro del mundo de Kafka, existen diferentes protocolos que puede utilizar para la serialización (ejemplo JSON o ProtoBuf) pero Apache Avro es, como mucho, el protocolo de serialización más utilizado. Apache Avro es un sistema de serialización de datos que combinado con Kafka proporciona una serialización binaria basada en esquemas, robusta y rápida.

Apache Avro es agnóstico, es decir, cualquier lenguaje puede enviar y leer mensajes en formato Avro si disponen del esquema.

El esquema se puede versionar y distribuir considerándolo como un fuente más. Que debe ser compartido entre los proyectos. Porque a partir del esquema se debe compilar (generar los fuentes) para el lenguaje con el que se va a programar el consumidor o productor.

Aquí vamos a ver como como puedes usar Apache Avro con un registro de esquemas (APIcurio Registry) dentro una aplicación Quarkus.

Vamos a crear una simple aplicación que reciba peticiones HTTP, escribiendo mensajes en Kafka, y leyéndolas de Kafka. Para simplificar, la misma aplicación escribirá en el broker de Kafka y leerá de él, pero en el mundo real serían aplicaciones diferentes.

Comencemos

$ mvn io.quarkus:quarkus-maven-plugin:1.9.2.Final:create \
-DprojectGroupId=mx.com.quarkus -DprojectArtifactId=apicurio-avro \ 
-DclassName="mx.com.quarkus.MovieResource" -Dpath="/movies" \
-Dextensions="resteasy-jsonb,quarkus-avro,quarkus-smallrye-reactive-messaging-kafka"

Ahora añadiremos algunas configuraciones a nuestro pom.xml después de <dependency>:

<dependency>
  <groupId>io.apicurio</groupId>
  <artifactId>apicurio-registry-utils-serde</artifactId>
  <version>1.3.2.Final</version>
  <exclusions>
    <exclusion>
      <groupId>org.jboss.spec.javax.interceptor</groupId>
      <artifactId>jboss-interceptors-api_1.2_spec</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Esta dependencia proporciona el serializador y el deserializador de Avro. Existen diferentes implementaciones como la de Confluent pero en este ejemplo usaremos las proporcionadas por Apicurio.

También necesitamos añadir el plugin avro-plugin-maven bajo <build><plugin> añadimos el siguiente bloque:

<plugin>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro-maven-plugin</artifactId>
   <version>1.9.2</version>
   <executions>
      <execution>
         <phase>generate-sources</phase>
         <goals>
            <goal>schema</goal>
         </goals>
         <configuration>
             <sourceDirectory>${project.build.directory}/schemas</sourceDirectory>
             <outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
           <stringType>String</stringType>
         </configuration>
       </execution>
    </executions>
</plugin>

Este plug-in genera el código a partir de los archivos de esquema de Avro que tengas definido en tu fuente

En el caso de este ejemplo usaremos el plug-in de Maven de Apicurio para descargar artefactos del Registro. esto nos ayudará a generar código a partir de un esquema registrado.

<plugin>
        <groupId>io.apicurio</groupId>
        <artifactId>apicurio-registry-maven-plugin</artifactId>
        <version>1.3.2.Final</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>download</goal>
            </goals>
            <configuration>
              <registryUrl>http://localhost:8081/api</registryUrl>
              <ids>
                <param1>movies</param1>
              </ids>
              <artifactExtension>.avsc</artifactExtension>
              <outputDirectory>${project.build.directory}/schemas</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>

Antes de hacer código levantemos la infraestructura

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=PLAINTEXT://0.0.0.0:9092 --override advertised.listeners=PLAINTEXT://localhost:9092 --override zookeeper.connect=zookeeper:2181"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  schema-registry:
    image: apicurio/apicurio-registry-mem:1.2.2.Final
    ports:
      - 8081:8080
    depends_on:
      - kafka
    environment:
      QUARKUS_PROFILE: prod
      KAFKA_BOOTSTRAP_SERVERS: localhost:9092
      APPLICATION_ID: registry_id
      APPLICATION_SERVER: localhost:9000

Ejecutemos la infraestructura en mi caso utilice podman-compose:

$ podman-compose up -d

Para detener y remover la infraestructura

$ podman-compose down

Configuremos nuestro esquema

Primero, tenemos que escribir el esquema que representa el objeto que leeremos y escribiremos en Kafka.

{
   "namespace": "mx.com.quarkus.schema",
   "type": "record",
   "name": "Movie",
   "fields": [
     {
       "name": "title",
       "type": "string"
     },
     {
       "name": "year",
       "type": "int"
     }
   ]
 }

Ahora nuestro esquema lo daremos de alta en Apicurio Registry en un navegador abra el siguiente enlace http://localhost:8081

Damos clic en «Upload aritfact» y configuramos nuestro esquema:

ID: movies
Type: Auto-Detect 
Artifact: "Copiamos la definición de nuestro artefacto"

Escribamos un poco de código

El «apicurio-maven-plugin» descargará el esquemas del registro y el «avro-maven-plugin», genera la clase «mx.com.quarkus.schema.Movie» con los atributos de título y año. Para generar la clase, ejecute:

$ mvn generate-sources

La primer clase que vamos a crear es la encargada de recibir las peticiones y escribirá el dato en Kafka. Modificamos el archivo src/main/java/mx/com/quarkus/MovieResource.java

package mx.com.quarkus;

import mx.com.quarkus.schema.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/movies")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {

    private static final Logger LOGGER =
        Logger.getLogger("MovieResource");

    @Inject @Channel("movies") Emitter<Movie> emitter;


    @POST
    public Response enqueueMovie(Movie movie) {
        LOGGER.infof("Sending movie %s to Kafka",
            movie.getTitle()
        );
        emitter.send(movie);
        return Response.accepted().build();
    }

}

Este recurso del JAX-RS es bastante sencillo. Tiene un método de «endpoint», recibiendo la carga útil de JSON en /movies. RESTEasy mapea el documento JSON en los objetos del tipo «Movie» automáticamente. Como se describe en el archivo «avsc», el JSON esperado contiene dos campos: «title» y «year».

Cuando se usa Quarkus con Mensajería Reactiva, no se interactúa con Kafka directamente. Inyectas un «Emitter», que envía un objeto (movie) a un canal. La configuración de la aplicación asigna este canal a un tópico de Kafka.

Hablando de configuración, abre el «src/main/resources/application.properties», y añade:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api

mp.messaging.outgoing.movies.connector=smallrye-kafka
mp.messaging.outgoing.movies.topic=movies
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

Esta configuración requiere una pequeña explicación. Primero, «mp.messaging.connector.smallrye-kafka.apicurio.registry.url» configura la URL del registro del esquema. Si utiliza el serde Confluent, en lugar del de Apicurio, la propiedad se denomina «mp.messaging.connector.smallrye-kafka.schema.registry.url»

El atributo «mp.messaging.outgoing.movies» configuran el canal donde se pondrá el objeto «Movie». El atributo conector indica que el conector SmallRye Kafka gestiona el canal.El atributo topic (que podríamos omitir en este caso ya que coincide con el nombre del canal) especifica el nombre del tópico value.serializer configura el serializador a utilizar. Aquí usamos «io.apicurio.registry.utils.serde.AvroKafkaSerializer» proporcionado por Apicurio. Las propiedades registry.* configuran cómo el registro maneja el esquema.

El consumidor

La otra parte de nuestra de la aplicación es aún más simple. Sólo registra las el objeto «Movie» que vamos recibiendo en el canal de información.

Crea el src/main/java/mx/com/quarkus/MovieConsumer.java con el siguiente contenido:

package mx.com.quarkus;

import mx.com.quarkus.schema.Movie;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MovieConsumer {

    private static final Logger LOGGER =
        Logger.getLogger("MovieConsumer");

    @Incoming("movies-from-kafka")
    public void receive(Movie movie) {
        LOGGER.infof("Received movie: %s (%d)",
            movie.getTitle(), movie.getYear());
    }

}

La anotación @Incoming indica que el método se llama para cada objeto de tipo «Movie» que transita por el canal «movies-from-kafka». En este caso, simplemente escribimos un mensaje en el log.

Ya casi terminamos. Tenemos que configurar la recepción de Kafka. Vuelve a abrir el «application.properties» y añade:

mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka 
mp.messaging.incoming.movies-from-kafka.topic=movies
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider

Estas propiedades configuran el camino del objeto «Movie» desde Kafka hasta el tópico de Kafka. También configura el deserializador (io.apicurio.registry.utils.serde.AvroKafkaDeserializer). Deshabilitamos la auto-commit de Kafka (enable.auto.commit=false), ya que Mensajería Reactiva se encarga de la commit por ti.

Debido a que el emisor y el receptor están colocados en la misma aplicación, no podemos usar el mismo nombre de canal.

Tiempo de ejecutar.

Iniciemos nuestra aplicación

mvn compile quarkus:dev

Una vez que ejecutó, abra otra terminal y publique algunas peliculas:

curl --header "Content-Type: application/json" \
   --request POST \
   --data '{"title":"The Shawshank Redemption","year":1994}' \
   http://localhost:8080/movies
 curl --header "Content-Type: application/json" \
   --request POST \
   --data '{"title":"The Godfather","year":1972}' \
   http://localhost:8080/movies
 curl --header "Content-Type: application/json" \
   --request POST \
   --data '{"title":"The Dark Knight","year":2008}' \
   http://localhost:8080/movies
 curl --header "Content-Type: application/json" \
   --request POST \
   --data '{"title":"12 Angry Men","year":1957}' \
   http://localhost:8080/movies

En el log de la aplicación debe visualizar la siguiente salida:

2020-11-11 16:42:22,597 INFO  MovieResource Sending movie The Shawshank Redemption to Kafka
 2020-11-11 16:42:22,619 INFO  MovieResource Sending movie The Godfather to Kafka
 2020-11-11 16:42:22,624 INFO  MovieConsumer Received movie: The Shawshank Redemption (1994)
 2020-11-11 16:42:22,641 INFO  MovieConsumer Received movie: The Godfather (1972)
 2020-11-11 16:42:22,644 INFO  MovieResource Sending movie The Dark Knight to Kafka
 2020-11-11 16:42:22,663 INFO  MovieConsumer Received movie: The Dark Knight (2008)
 2020-11-11 16:42:22,669 INFO  MovieResource Sending movie 12 Angry Men to Kafka
 2020-11-11 16:42:22,688 INFO  MovieConsumer Received movie: 12 Angry Men (1957)

Conclusión

La importancia del gobierno de esquema a tráves de un registro nos permite tener un desacoplamiento entre los consumidores y productores debido a que los servicios no necesitan conocer quien conusme los datos que produce.

Otro punto importante es que al conocer el esquema de información, los datos que se inyectan no serán incorrectos y con eso no se rompan los servicios consumidores.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *