En una arquitectura orientada a eventos (EDA) lo más importante son los datos en sí y más importante aún es gobernar el formato del dato, es decir, usar esquemas.
Los esquemas se usan cuando se lee y escriben los datos. Los esquemas deben ser compartidos entre los servicios que envían el dato y el que los consumen y estos necesitan un recurso centralizado donde gobernar la información sobre el esquema.
Dentro del mundo de Kafka, existen diferentes protocolos que puede utilizar para la serialización (ejemplo JSON o ProtoBuf) pero Apache Avro es, como mucho, el protocolo de serialización más utilizado. Apache Avro es un sistema de serialización de datos que combinado con Kafka proporciona una serialización binaria basada en esquemas, robusta y rápida.
Apache Avro es agnóstico, es decir, cualquier lenguaje puede enviar y leer mensajes en formato Avro si disponen del esquema.
El esquema se puede versionar y distribuir considerándolo como un fuente más. Que debe ser compartido entre los proyectos. Porque a partir del esquema se debe compilar (generar los fuentes) para el lenguaje con el que se va a programar el consumidor o productor.
Aquí vamos a ver como como puedes usar Apache Avro con un registro de esquemas (APIcurio Registry) dentro una aplicación Quarkus.
Vamos a crear una simple aplicación que reciba peticiones HTTP, escribiendo mensajes en Kafka, y leyéndolas de Kafka. Para simplificar, la misma aplicación escribirá en el broker de Kafka y leerá de él, pero en el mundo real serían aplicaciones diferentes.
Comencemos
$ mvn io.quarkus:quarkus-maven-plugin:1.9.2.Final:create \
-DprojectGroupId=mx.com.quarkus -DprojectArtifactId=apicurio-avro \
-DclassName="mx.com.quarkus.MovieResource" -Dpath="/movies" \
-Dextensions="resteasy-jsonb,quarkus-avro,quarkus-smallrye-reactive-messaging-kafka"
Ahora añadiremos algunas configuraciones a nuestro pom.xml después de <dependency>:
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>1.3.2.Final</version>
<exclusions>
<exclusion>
<groupId>org.jboss.spec.javax.interceptor</groupId>
<artifactId>jboss-interceptors-api_1.2_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
Esta dependencia proporciona el serializador y el deserializador de Avro. Existen diferentes implementaciones como la de Confluent pero en este ejemplo usaremos las proporcionadas por Apicurio.
También necesitamos añadir el plugin avro-plugin-maven bajo <build><plugin> añadimos el siguiente bloque:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.9.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/schemas</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
Este plug-in genera el código a partir de los archivos de esquema de Avro que tengas definido en tu fuente
En el caso de este ejemplo usaremos el plug-in de Maven de Apicurio para descargar artefactos del Registro. esto nos ayudará a generar código a partir de un esquema registrado.
<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>1.3.2.Final</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>download</goal>
</goals>
<configuration>
<registryUrl>http://localhost:8081/api</registryUrl>
<ids>
<param1>movies</param1>
</ids>
<artifactExtension>.avsc</artifactExtension>
<outputDirectory>${project.build.directory}/schemas</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Antes de hacer código levantemos la infraestructura
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=PLAINTEXT://0.0.0.0:9092 --override advertised.listeners=PLAINTEXT://localhost:9092 --override zookeeper.connect=zookeeper:2181"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:1.2.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
APPLICATION_ID: registry_id
APPLICATION_SERVER: localhost:9000
Ejecutemos la infraestructura en mi caso utilice podman-compose:
$ podman-compose up -d
Para detener y remover la infraestructura
$ podman-compose down
Configuremos nuestro esquema
Primero, tenemos que escribir el esquema que representa el objeto que leeremos y escribiremos en Kafka.
{
"namespace": "mx.com.quarkus.schema",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "title",
"type": "string"
},
{
"name": "year",
"type": "int"
}
]
}
Ahora nuestro esquema lo daremos de alta en Apicurio Registry en un navegador abra el siguiente enlace http://localhost:8081
Damos clic en «Upload aritfact» y configuramos nuestro esquema:
ID: movies
Type: Auto-Detect
Artifact: "Copiamos la definición de nuestro artefacto"
Escribamos un poco de código
El «apicurio-maven-plugin» descargará el esquemas del registro y el «avro-maven-plugin», genera la clase «mx.com.quarkus.schema.Movie» con los atributos de título y año. Para generar la clase, ejecute:
$ mvn generate-sources
La primer clase que vamos a crear es la encargada de recibir las peticiones y escribirá el dato en Kafka. Modificamos el archivo src/main/java/mx/com/quarkus/MovieResource.java
package mx.com.quarkus;
import mx.com.quarkus.schema.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/movies")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {
private static final Logger LOGGER =
Logger.getLogger("MovieResource");
@Inject @Channel("movies") Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka",
movie.getTitle()
);
emitter.send(movie);
return Response.accepted().build();
}
}
Este recurso del JAX-RS es bastante sencillo. Tiene un método de «endpoint», recibiendo la carga útil de JSON en /movies. RESTEasy mapea el documento JSON en los objetos del tipo «Movie» automáticamente. Como se describe en el archivo «avsc», el JSON esperado contiene dos campos: «title» y «year».
Cuando se usa Quarkus con Mensajería Reactiva, no se interactúa con Kafka directamente. Inyectas un «Emitter», que envía un objeto (movie) a un canal. La configuración de la aplicación asigna este canal a un tópico de Kafka.
Hablando de configuración, abre el «src/main/resources/application.properties», y añade:
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api
mp.messaging.outgoing.movies.connector=smallrye-kafka
mp.messaging.outgoing.movies.topic=movies
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
Esta configuración requiere una pequeña explicación. Primero, «mp.messaging.connector.smallrye-kafka.apicurio.registry.url» configura la URL del registro del esquema. Si utiliza el serde Confluent, en lugar del de Apicurio, la propiedad se denomina «mp.messaging.connector.smallrye-kafka.schema.registry.url»
El atributo «mp.messaging.outgoing.movies» configuran el canal donde se pondrá el objeto «Movie». El atributo conector indica que el conector SmallRye Kafka gestiona el canal.El atributo topic (que podríamos omitir en este caso ya que coincide con el nombre del canal) especifica el nombre del tópico value.serializer configura el serializador a utilizar. Aquí usamos «io.apicurio.registry.utils.serde.AvroKafkaSerializer» proporcionado por Apicurio. Las propiedades registry.* configuran cómo el registro maneja el esquema.
El consumidor
La otra parte de nuestra de la aplicación es aún más simple. Sólo registra las el objeto «Movie» que vamos recibiendo en el canal de información.
Crea el src/main/java/mx/com/quarkus/MovieConsumer.java
con el siguiente contenido:
package mx.com.quarkus;
import mx.com.quarkus.schema.Movie;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MovieConsumer {
private static final Logger LOGGER =
Logger.getLogger("MovieConsumer");
@Incoming("movies-from-kafka")
public void receive(Movie movie) {
LOGGER.infof("Received movie: %s (%d)",
movie.getTitle(), movie.getYear());
}
}
La anotación @Incoming indica que el método se llama para cada objeto de tipo «Movie» que transita por el canal «movies-from-kafka». En este caso, simplemente escribimos un mensaje en el log.
Ya casi terminamos. Tenemos que configurar la recepción de Kafka. Vuelve a abrir el «application.properties» y añade:
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
mp.messaging.incoming.movies-from-kafka.topic=movies
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
Estas propiedades configuran el camino del objeto «Movie» desde Kafka hasta el tópico de Kafka. También configura el deserializador (io.apicurio.registry.utils.serde.AvroKafkaDeserializer). Deshabilitamos la auto-commit de Kafka (enable.auto.commit=false), ya que Mensajería Reactiva se encarga de la commit por ti.
Debido a que el emisor y el receptor están colocados en la misma aplicación, no podemos usar el mismo nombre de canal.
Tiempo de ejecutar.
Iniciemos nuestra aplicación
mvn compile quarkus:dev
Una vez que ejecutó, abra otra terminal y publique algunas peliculas:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
En el log de la aplicación debe visualizar la siguiente salida:
2020-11-11 16:42:22,597 INFO MovieResource Sending movie The Shawshank Redemption to Kafka
2020-11-11 16:42:22,619 INFO MovieResource Sending movie The Godfather to Kafka
2020-11-11 16:42:22,624 INFO MovieConsumer Received movie: The Shawshank Redemption (1994)
2020-11-11 16:42:22,641 INFO MovieConsumer Received movie: The Godfather (1972)
2020-11-11 16:42:22,644 INFO MovieResource Sending movie The Dark Knight to Kafka
2020-11-11 16:42:22,663 INFO MovieConsumer Received movie: The Dark Knight (2008)
2020-11-11 16:42:22,669 INFO MovieResource Sending movie 12 Angry Men to Kafka
2020-11-11 16:42:22,688 INFO MovieConsumer Received movie: 12 Angry Men (1957)
Conclusión
La importancia del gobierno de esquema a tráves de un registro nos permite tener un desacoplamiento entre los consumidores y productores debido a que los servicios no necesitan conocer quien conusme los datos que produce.
Otro punto importante es que al conocer el esquema de información, los datos que se inyectan no serán incorrectos y con eso no se rompan los servicios consumidores.
- Código Fuente: https://github.com/mikeintoch/apicurio-avro.git
- Referencias: