Capturar cambios en bases de datos con Debezium y Apache Camel

Imagen de SkaFRex  •  So long, and thanks for all the likes! en Pixabay

En este artículo, exploraremos cómo utilizar Debezium y Apache Camel en conjunto con Quarkus y una base de datos PostgreSQL para construir una aplicación reactiva.

¿Qué es Debezium?

Debezium es una plataforma de cambio de datos distribuida y basada en eventos que se utiliza para capturar los cambios de datos en sistemas de bases de datos y enviarlos a otros sistemas en tiempo real con esto las aplicaciones podrán responder a todas las inserciones, actualizaciones y eliminaciones que otras aplicaciones realicen en la bases de datos. Debezium puede conectarse a una variedad de sistemas de bases de datos, incluyendo MySQL, PostgreSQL, MongoDB, SQL Server y Oracle.

¿Qué es Apache Camel?

Apache Camel es un framework de integración de código abierto que proporciona una amplia variedad de componentes y patrones de integración para ayudar a los desarrolladores a integrar aplicaciones de manera eficiente. Camel se integra fácilmente con otros sistemas y tecnologías, lo que lo convierte en una herramienta muy valiosa para los desarrolladores que necesitan integrar múltiples sistemas de manera eficiente.

¿Qué es Quarkus?

Quarkus es un framework Java nativo de Kubernetes y full-stack hecho para máquinas virtuales Java (JVM) y compilación nativa (GraalVM), que optimiza Java para contenedores y le permite convertirse en una plataforma eficaz para entornos serverless, en la nube y Kubernetes. Quarkus también es compatible con varios frameworks y librerías de código abierto populares, lo que facilita la integración de tecnologías como Debezium y Apache Camel.

Debezium con Apache Camel

El uso de Debezium con Apache Camel puede proporcionar varias ventajas, entre las que se incluyen:

  • Integración completa: Camel proporciona una amplia gama de conectores y componentes que se pueden utilizar para integrarse con diferentes fuentes y destinos de datos. Debezium puede integrarse fácilmente con Camel, permitiéndole capturar y transmitir cambios en la base de datos a varios destinos/sistemas.
  • Transformaciones de datos simplificadas: Camel proporciona capacidades de transformación de datos que se pueden utilizar para convertir formatos y estructuras de datos. Cuando se usa junto con Debezium, Camel se puede utilizar para transformar y filtrar los cambios de la base de datos, lo que facilita el procesamiento de los datos.
  • Enrutamiento flexible: Camel proporciona un motor de enrutamiento flexible que puede utilizarse para enrutar datos basándose en varios criterios, como el origen o el destino de los datos. Cuando se usa junto con Debezium, Camel se puede utilizar para enrutar los cambios de la base de datos a diferentes destinos en función del contenido de los datos.
  • Escalabilidad: Debezium proporciona una arquitectura distribuida que puede utilizarse para escalar horizontalmente. Cuando se utiliza con Apache Camel, puede aprovechar la escalabilidad de Debezium y Camel para gestionar grandes volúmenes de cambios de datos y procesarlos en tiempo real.

Además el uso de Debezium con Camel permite que no se dependa de un clúster externo de Kafka o que se configuren los «Service Connectors de Kafka». En su lugar, Debezium se ejecuta directamente en el contexto de la aplicación. Capturando los mismos eventos de cambio dentro de la base de datos.

Esto es útil para los procesos ETL (Extract, Transform, Load), ya que permite mover datos de un sistema a otro en tiempo real, como en el siguiente ejemplo:

Camel incluye componentes para Debezium en su implementación lo que permite usarlo dentro de Apache Camel como un endpoint de consumo lo que permite ejecutar el motor de Debezium de forma embebida en la instancia de ejecución de Camel.

Algunos de los conectores actualmente disponibles son:

Ahora que hemos presentado las diferentes tecnologías que vamos a utilizar, comencemos a construir nuestra aplicación.

Puede encontrar el código fuente completo del ejemplo en GitHub.

Paso 1. Crear una aplicación de quarkus usando maven, con las dependencias necesarias para construir nuestra aplicación

mvn io.quarkus:quarkus-maven-plugin:2.15.3.Final:create \                                                                                          
    -DprojectGroupId=dev.mikeintoch \
    -DprojectArtifactId=quarkus-debezium-camel \
    -Dextensions="camel-quarkus-core,camel-quarkus-debezium-postgres,camel-quarkus-direct,camel-quarkus-rest,camel-quarkus-jsonpath,camel-quarkus-mongodb,camel-quarkus-jpa,quarkus-hibernate-orm,quarkus-agroal"

Paso 2. Necesitamos configurar Debezium para conectarse a la base de datos PostgreSQL y capturar los cambios. Cree un nuevo archivo llamado application.properties y añada las siguientes propiedades:

#Camel Debezium Properties
camel.component.debezium-postgres.offset-storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
camel.component.debezium-postgres.offset-storage-file-name=/tmp/offset.dat
camel.component.debezium-postgres.database-server-name=my_db_server
camel.component.debezium-postgres.database-hostname=localhost
camel.component.debezium-postgres.database-dbname=my_db
camel.component.debezium-postgres.database-user=my_username
camel.component.debezium-postgres.database-password=my_password
camel.component.debezium-postgres.database-port=5432
camel.component.debezium-postgres.plugin-name=pgoutput
camel.component.debezium-postgres.schema-include-list=public
camel.component.debezium-postgres.table-include-list=public.customer
camel.component.debezium-postgres.offset-flush-interval-ms=10000

Paso 3. Ahora, podemos crear una ruta Camel para procesar los cambios capturados por Debezium. Cree un nuevo archivo llamado DebeziumRoute.java y añada el siguiente código:

import java.util.Map;

import org.apache.camel.builder.RouteBuilder;

public class DebeziumRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("debezium-postgres:my_connector")
        .log("${body}")
        .choice()
          .when(header("CamelDebeziumOperation").isEqualTo("c"))
              .convertBodyTo(Map.class)
              .to("mongodb:mydb?database={{mongodb.database}}&collection={{mongodb.collection}}&operation=insert");
    }
}

Esta ruta escuchará los cambios en la tabla customer de la base de datos my_db y los enviara a una base de datos mongodb. También estamos procesando los datos para solo procesar los eventos de creación de registros.

Paso 4. Configurar servicios para probar localmente

Vamos a hacer uso de Quarkus DevServices una potente característica del framework Quarkus que simplifica el desarrollo y las pruebas de aplicaciones que dependen de servicios externos como bases de datos, sistemas de mensajería y otros recursos. Permite a los desarrolladores iniciar y gestionar automáticamente estos servicios externos durante el desarrollo y las pruebas, eliminando la necesidad de configurarlos manualmente o de utilizar un entorno independiente para las pruebas. Esta característica reduce la complejidad del proceso de desarrollo, aumenta la eficiencia y ayuda a garantizar que su aplicación funcione correctamente en entornos de producción. Con Quarkus DevServices, los desarrolladores pueden centrarse en escribir código y crear aplicaciones de alta calidad en lugar de gestionar dependencias complejas utilizando contenedores para levantar estos servicios.

En este caso usaremos dos servicios de base de datos Postgresql y MongoDB, entonces configuremos los servicios para estos componentes, agregue las siguientes propiedades en el archivo application.properties

#Quarkus postgresql database devservices
quarkus.datasource.devservices.enabled=true
quarkus.datasource.devservices.port=5432
quarkus.datasource.devservices.db-name=my_db
quarkus.datasource.devservices.username=my_username
quarkus.datasource.devservices.password=my_password
quarkus.datasource.devservices.command=postgres -c wal_level=logical

# MongoDB dev services
quarkus.mongodb.devservices.enabled=true
quarkus.mongodb.devservices.port=27017

Paso 5. Ejecuta la aplicación.

mvn quarkus:dev

Esto levantará dos contenedores uno de Postgresql y otro de MongoDB para probar el ejercicio.

Mientras tanto en otra terminal ejecute la creación de algunos registros dentro de la base de datos para capturarlos.

curl http://localhost:8080/customer -XPOST -i -H 'Content-Type: application/json' -d '{"name": "Mike Appleseed","email": "appleseed@example.com","phone":"+1-455-9123","country": "United States"}'

curl http://localhost:8080/customer -XPOST -i -H 'Content-Type: application/json' -d '{"name": "Carmen Rojas","email": "carmen@example.com","phone":"+355-945-894","country": "Albania"}'

curl http://localhost:8080/customer -XPOST -i -H 'Content-Type: application/json' -d '{"name": "Joe Robinson","email": "joerobinson@example.com","phone":"+1-746-8796","country": "United States"}'

Paso 6. Verifica que los datos se encuentren en su destino final.

Conclusión

Apache Camel y Debezium proporcionan una excelente combinación para construir arquitecturas basadas en eventos que pueden capturar y procesar cambios en sus fuentes de datos.

Al usar estas herramientas juntas, puedes crear microservicios y aplicaciones que pueden reaccionar en tiempo real a los cambios en sus fuentes de datos, sin la necesidad de procesos costosos y complejos de sincronización de datos. Esto puede llevar a mejoras significativas en la consistencia y confiabilidad de los datos, así como a un procesamiento de datos más rápido y eficiente.

Puede encontrar el código fuente completo del ejemplo en GitHub.

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *