En este documento, se describe cómo integrar Apache Kafka y Pub/Sub mediante con el conector de Kafka del grupo de Pub/Sub.
Acerca del conector de Kafka del grupo de Pub/Sub
Apache Kafka es una plataforma de código abierto para la transmisión de eventos. Se suele usar en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Pub/Sub es un servicio administrado para enviar y reciben mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para comunicarte entre los componentes de tu arquitectura de nube.
El conector de Kafka de grupos de Pub/Sub te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el archivo JAR del conector:
- El conector de receptor lee registros de uno o más temas de Kafka y los publica en Pub/Sub.
- El conector de origen lee los mensajes de un tema de Pub/Sub. y las publica en Kafka.
A continuación, se describen algunas situaciones en las que podrías usar el conector de Kafka de grupos de Pub/Sub:
- Estás migrando una arquitectura basada en Kafka a Google Cloud.
- Tienes un sistema de frontend que almacena eventos en Kafka fuera de Google Cloud, pero también usas Google Cloud para ejecutar algunos de tus servicios de backend, que deben recibir los eventos de Kafka.
- Recopila registros de una solución de Kafka local y envíalos a Google Cloud para el análisis de datos.
- Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos en las instalaciones con Kafka.
El conector requiere Kafka Connect, que es un framework para la transmisión de datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.
En este documento, se supone que conoces Kafka y Pub/Sub. Antes de leer este documento, te recomendamos completa una de las Guías de inicio rápido de Pub/Sub.
El conector de Pub/Sub no admite ninguna integración entre las LCA de Google Cloud IAM y Kafka Connect.
Comienza a usar el conector
En esta sección, se explican las siguientes tareas:- Configura el conector de Kafka del grupo de Pub/Sub.
- Enviar eventos de Kafka a Pub/Sub
- Enviar mensajes de Pub/Sub a Kafka
Requisitos previos
Instala Kafka
Sigue la guía de inicio rápido de Apache Kafka para instalar un nodo único de Kafka en tu máquina local. Completa estos pasos en la guía de inicio rápido:
- Descarga la versión más reciente de Kafka y extráela.
- Inicia el entorno de Kafka.
- Crear un tema de Kafka
Autenticar
El conector de Kafka del grupo de Pub/Sub debe autenticarse con Pub/Sub para para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, sigue estos pasos:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Descarga el conector JAR
Descarga el archivo JAR del conector en tu máquina local. Para obtener más información, consulta Cómo adquirir el conector en el archivo de instrucciones de GitHub.
Copia los archivos de configuración del conector
Clona o descarga la Repositorio de GitHub para el conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia el contenido del directorio
config
en el subdirectorioconfig
de la instalación de Kafka.cp config/* [path to Kafka installation]/config/
Estos archivos contienen los parámetros de configuración del conector.
Actualiza la configuración de Kafka Connect
- Navega al directorio que contiene el objeto binario de Kafka Connect que descargado.
- En el directorio binario de Kafka Connect, abre el archivo llamado
config/connect-standalone.properties
en un editor de texto. - Si el
plugin.path property
está comentado, quita el comentario. Actualiza el
plugin.path property
para incluir la ruta de acceso al archivo JAR del conector.Ejemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Configura la propiedad
offset.storage.file.filename
con un nombre de archivo local. En en modo independiente, Kafka usa este archivo para almacenar datos desplazados.Ejemplo:
offset.storage.file.filename=/tmp/connect.offsets
Reenvía eventos de Kafka a Pub/Sub
En esta sección, se describe cómo iniciar el conector de receptor, publicar eventos en Kafka y, luego, leer los mensajes reenviados desde Pub/Sub.
Usa la Google Cloud CLI para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub para recibir los mensajes de Kafka.
- PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción a Pub/Sub para el tema.
Abre el archivo
/config/cps-sink-connector.properties
en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como"TODO"
en el comentarios:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- KAFKA_TOPICS: Una lista separada por comas de temas de Kafka para leer de la imagen de la que se originó.
- PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub.
- PUBSUB_TOPIC: Es el tema de Pub/Sub que recibirá los mensajes de Kafka.
En el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Sigue los pasos que se indican Guía de inicio rápido de Apache Kafka para escribir algunos eventos en su tema de Kafka.
Usa gcloud CLI para leer los eventos desde Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Reenviar mensajes de Pub/Sub a Kafka
En esta sección, se describe cómo iniciar el conector de origen, publicar mensajes en Pub/Sub y leer los mensajes reenviados desde Kafka.
Usa gcloud CLI para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub.
- PUBSUB_SUBSCRIPTION: Es el nombre de un Pub/Sub. suscripción.
Abre el archivo llamado
/config/cps-source-connector.properties
en un texto Editor. Agrega valores para las siguientes propiedades, que están marcadas como"TODO"
en los comentarios:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Reemplaza lo siguiente:
- KAFKA_TOPIC: Son los temas de Kafka que recibirán los mensajes de Pub/Sub.
- PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub.
- PUBSUB_TOPIC: Es el tema de Pub/Sub.
En el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Usa gcloud CLI para publicar un mensaje en Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leer el mensaje de Kafka Sigue los pasos de la Guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.
Conversión de mensajes
Un registro Kafka contiene una clave y un valor, que son arrays de bytes de longitud variable. De manera opcional, un registro de Kafka también puede tener encabezados, que son pares clave-valor. R Mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos de par clave-valor.
Kafka Connect utiliza convertidores para serializar claves y valores desde y hacia Kafka. Para controlar la serialización, establece las siguientes propiedades en el conector de Terraform:
key.converter
: Es el convertidor que se usa para serializar las claves de registro.value.converter
: Es el convertidor que se usa para serializar los valores de registro.
El cuerpo de un mensaje de Pub/Sub es un objeto ByteString
, por lo que
la conversión más eficiente es copiar
la carga útil directamente. Por esa razón,
se recomienda usar un conversor que produzca tipos de datos primitivos (número entero, número de punto flotante,
o esquema de bytes) cuando sea posible para evitar la deserialización y
a volver a serializar el mismo cuerpo de mensaje.
Conversión de Kafka a Pub/Sub
El conector del receptor convierte los registros de Kafka en mensajes de Pub/Sub, como sigue:
- La clave de registro Kafka se almacena como un atributo llamado
"key"
en el mensaje de Pub/Sub. - De forma predeterminada, el conector descarta todos los encabezados del registro de Kafka. Sin embargo, si
estableciste la opción de configuración
headers.publish
entrue
, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite los encabezados que superan los límites de Pub/Sub en los atributos de los mensajes. - Para los esquemas de números enteros, de números de punto flotante, de cadena y de bytes, el conector pasa los bytes del valor del registro Kafka en el mensaje de Pub/Sub cuerpo.
- Para esquemas de struct, el conector escribe cada campo como un atributo del
mensaje de Pub/Sub. Por ejemplo, si el campo es
{ "id"=123 }
, el mensaje de Pub/Sub resultante tiene un atributo"id"="123"
. El el valor de este campo siempre se convierte en una cadena. Los tipos Map y struct no son se admiten como tipos de campo dentro de un struct. - En el caso de los esquemas de mapas, el conector escribe cada par clave-valor como un atributo del mensaje de Pub/Sub. Por ejemplo, si el mapa es
{"alice"=1,"bob"=2}
, el mensaje de Pub/Sub resultante tiene dos atributos,"alice"="1"
y"bob"="2"
. Las claves y los valores se convierten en cadenas.
Los esquemas de struct y mapa tienen algunos comportamientos adicionales:
De manera opcional, puedes especificar un campo de struct o una clave de mapa en particular para que sea el cuerpo del mensaje. Para ello, configura la propiedad de configuración
messageBodyName
. El valor del campo o la clave se almacena como unByteString
en el cuerpo del mensaje. Si no configurasmessageBodyName
, el cuerpo del mensaje estará vacío para los esquemas de struct y map.En el caso de los valores de array, el conector solo admite tipos de array primitivos. El de valores del array se concatena en un solo elemento
ByteString
.
Conversión de Pub/Sub a Kafka
El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:
Clave de registro Kafka: De forma predeterminada, la clave se establece en
null
. De manera opcional, puede especificar un atributo de mensaje de Pub/Sub para usar como clave, estableciendo la opción de configuraciónkafka.key.attribute
En ese caso, el conector busca un atributo con ese nombre y establece la clave de registro en el valor del atributo. Si el atributo especificado no está presente, la clave de registro se se establece ennull
.Valor del registro Kafka. El conector escribe el valor del registro de la siguiente manera:
Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector Escribe el cuerpo del mensaje de Pub/Sub directamente en el registro de Kafka. valor como un tipo
byte[]
, con el conversor especificado porvalue.converter
Si el mensaje de Pub/Sub tiene atributos personalizados
kafka.record.headers
esfalse
, el conector escribe un struct en el valor de registro. La estructura contiene un campo para cada atributo y un campo llamado"message"
cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
En este caso, debes usar un
value.converter
que sea compatible con los esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Si el mensaje de Pub/Sub tiene atributos personalizados y
kafka.record.headers
estrue
, el conector escribe los atributos como encabezados de registro de Kafka. Escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro de Kafka como un tipobyte[]
, con el convertidor especificado porvalue.converter
.
Encabezados del registro Kafka. De forma predeterminada, los encabezados están vacíos, a menos que establezcas De
kafka.record.headers
atrue
.
Opciones de configuración
Además de los parámetros de configuración proporcionados por la API de Kafka Connect, el El conector de Kafka del grupo de Pub/Sub admite la configuración del receptor y de la fuente, como descritos en Configuración del conector de Pub/Sub.
Obtén asistencia
Si necesitas ayuda, crea un ticket de asistencia. Para preguntas y debates generales, crea un problema en el repositorio de GitHub.
¿Qué sigue?
- Comprende las diferencias entre Kafka y Pub/Sub.
- Obtén más información sobre el conector de Kafka del grupo de Pub/Sub.
- Consulta el conector de Kafka del grupo de Pub/Sub Repositorio de GitHub.