En este documento, se describe cómo integrar Apache Kafka y Pub/Sub mediante el conector de Kafka de grupos de Pub/Sub.
Acerca del conector de Kafka de grupos de Pub/Sub
Apache Kafka es una plataforma de código abierto para eventos de transmisión. Por lo general, se usa en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Pub/Sub es un servicio administrado para enviar y recibir mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para la comunicación entre los componentes de la arquitectura en la nube.
El conector de Kafka de grupos de Pub/Sub te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el archivo JAR del conector:
- El conector de receptor lee los registros de uno o más temas de Kafka y los publica en Pub/Sub.
- El conector de origen lee los mensajes de un tema de Pub/Sub y los publica en Kafka.
A continuación, se muestran algunas situaciones en las que puedes usar el conector de Kafka de grupos de Pub/Sub:
- Estás migrando una arquitectura basada en Kafka a Google Cloud.
- Tienes un sistema de frontend que almacena eventos en Kafka fuera de Google Cloud, pero también usas Google Cloud para ejecutar algunos de tus servicios de backend, que necesitan recibir los eventos de Kafka.
- Recopilas registros de una solución de Kafka local y los envías a Google Cloud para el análisis de datos.
- Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos de forma local con Kafka.
El conector requiere Kafka Connect, que es un framework para transmitir datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.
En este documento, se supone que estás familiarizado con Kafka y Pub/Sub. Antes de leer este documento, se recomienda completar una de las guías de inicio rápido de Pub/Sub.
El conector de Pub/Sub no admite ninguna integración entre las LCA de Google Cloud IAM y Kafka Connect.
Comienza a usar el conector
En esta sección, se explican las siguientes tareas:- Configurar el conector de Kafka de grupos de Pub/Sub
- Enviar eventos de Kafka a Pub/Sub
- Enviar mensajes de Pub/Sub a Kafka
Requisitos previos
Instalar Kafka
Sigue la guía de inicio rápido de Apache Kafka para instalar un Kafka de nodo único en tu máquina local. Completa los siguientes pasos en la guía de inicio rápido:
- Descarga la versión más reciente de Kafka y extráela.
- Inicie el entorno de Kafka.
- Crear un tema de Kafka
Autenticar
El conector de Kafka de grupos de Pub/Sub debe autenticarse con Pub/Sub para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, sigue estos pasos:
- Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
- Instala Google Cloud CLI.
-
Para inicializar la CLI de gcloud, ejecuta el siguiente comando:
gcloud init
-
Crea o selecciona un proyecto de Google Cloud.
-
Crea un proyecto de Google Cloud:
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_ID
por un nombre para el proyecto de Google Cloud que estás creando. -
Selecciona el proyecto de Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_ID
por el nombre del proyecto de Google Cloud.
-
-
Crea credenciales de autenticación locales para tu Cuenta de Google:
gcloud auth application-default login
-
Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
EMAIL_ADDRESS
por tu dirección de correo electrónico. - Reemplaza
ROLE
por cada rol individual.
- Reemplaza
- Instala Google Cloud CLI.
-
Para inicializar la CLI de gcloud, ejecuta el siguiente comando:
gcloud init
-
Crea o selecciona un proyecto de Google Cloud.
-
Crea un proyecto de Google Cloud:
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_ID
por un nombre para el proyecto de Google Cloud que estás creando. -
Selecciona el proyecto de Google Cloud que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_ID
por el nombre del proyecto de Google Cloud.
-
-
Crea credenciales de autenticación locales para tu Cuenta de Google:
gcloud auth application-default login
-
Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
EMAIL_ADDRESS
por tu dirección de correo electrónico. - Reemplaza
ROLE
por cada rol individual.
- Reemplaza
Descarga el JAR del conector
Descarga el archivo JAR del conector en tu máquina local. Para obtener más información, consulta Adquiere el conector en el archivo readme de GitHub.
Copia los archivos de configuración del conector
Clona o descarga el repositorio de GitHub para el conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia el contenido del directorio
config
en el subdirectorioconfig
de la instalación de Kafka.cp config/* [path to Kafka installation]/config/
Estos archivos contienen los parámetros de configuración del conector.
Actualice su configuración de Kafka Connect
- Navega al directorio que contiene el objeto binario Kafka Connect que descargaste.
- En el directorio binario de Kafka Connect, abre el archivo llamado
config/connect-standalone.properties
en un editor de texto. - Si
plugin.path property
está marcado como comentario, quita el comentario. Actualiza el
plugin.path property
para incluir la ruta de acceso al conector JAR.Ejemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Configura la propiedad
offset.storage.file.filename
con un nombre de archivo local. En el modo independiente, Kafka usa este archivo para almacenar datos de desplazamiento.Ejemplo:
offset.storage.file.filename=/tmp/connect.offsets
Reenvía eventos de Kafka a Pub/Sub
En esta sección, se describe cómo iniciar el conector de receptores, publicar eventos en Kafka y, luego, leer los mensajes reenviados desde Pub/Sub.
Usa Google Cloud CLI para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub para recibir los mensajes de Kafka.
- PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción de Pub/Sub para el tema.
Abre el archivo
/config/cps-sink-connector.properties
en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como"TODO"
en los comentarios:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- KAFKA_TOPICS: Es una lista separada por comas de temas de Kafka desde los que se leerá.
- PROJECT_ID: Es el proyecto de Google Cloud que contiene el tema de Pub/Sub.
- PUBSUB_TOPIC: Es el tema de Pub/Sub para recibir los mensajes de Kafka.
Desde el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Sigue los pasos de la guía de inicio rápido de Apache Kafka para escribir algunos eventos en tu tema de Kafka.
Usa gcloud CLI para leer los eventos de Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Reenvía mensajes de Pub/Sub a Kafka
En esta sección, se describe cómo iniciar el conector de origen, publicar mensajes en Pub/Sub y leer los mensajes reenviados desde Kafka.
Usa gcloud CLI para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Reemplaza lo siguiente:
- PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub.
- PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción de Pub/Sub.
Abre el archivo llamado
/config/cps-source-connector.properties
en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como"TODO"
en los comentarios:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Reemplaza lo siguiente:
- KAFKA_TOPIC: Son los temas de Kafka para recibir los mensajes de Pub/Sub.
- PROJECT_ID: Es el proyecto de Google Cloud que contiene el tema de Pub/Sub.
- PUBSUB_TOPIC: Es el tema de Pub/Sub.
Desde el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Usa gcloud CLI para publicar un mensaje en Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lea el mensaje de Kafka. Sigue los pasos de la guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.
Conversión de mensajes
Un registro Kafka contiene una clave y un valor, que son arrays de bytes de longitud variable. De manera opcional, un registro Kafka también puede tener encabezados, que son pares clave-valor. Un mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos de par clave-valor.
Kafka Connect utiliza conversores para serializar claves y valores hacia y desde Kafka. Para controlar la serialización, establece las siguientes propiedades en los archivos de configuración del conector:
key.converter
: El conversor que se usa para serializar claves de registrovalue.converter
: el convertidor que se usa para serializar valores de registro.
El cuerpo de un mensaje de Pub/Sub es un objeto ByteString
, por lo que la conversión más eficiente es copiar la carga útil directamente. Por ese motivo, recomendamos usar un convertidor que produzca tipos de datos primitivos (esquema de número entero, número flotante, string o de bytes) siempre que sea posible para evitar la deserialización y la reserialización del mismo cuerpo del mensaje.
Conversión de Kafka a Pub/Sub
El conector de receptores convierte los registros de Kafka en mensajes de Pub/Sub de la siguiente manera:
- La clave de registro de Kafka se almacena como un atributo llamado
"key"
en el mensaje de Pub/Sub. - De forma predeterminada, el conector descarta todos los encabezados del registro Kafka. Sin embargo, si configuras la opción
headers.publish
comotrue
, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite cualquier encabezado que exceda los límites de los atributos de mensajes de Pub/Sub. - En el caso de los esquemas de números enteros, flotantes, de string y bytes, el conector pasa los bytes del valor del registro Kafka directamente al cuerpo del mensaje de Pub/Sub.
- En el caso de los esquemas de struct, el conector escribe cada campo como un atributo del mensaje de Pub/Sub. Por ejemplo, si el campo es
{ "id"=123 }
, el mensaje de Pub/Sub resultante tiene un atributo"id"="123"
. El valor del campo siempre se convierte en una cadena. Los tipos Map y struct no se admiten como tipos de campo dentro de una struct. - En el caso de los esquemas de mapa, el conector escribe cada par clave-valor como un atributo del mensaje de Pub/Sub. Por ejemplo, si el mapa es
{"alice"=1,"bob"=2}
, el mensaje de Pub/Sub resultante tiene dos atributos:"alice"="1"
y"bob"="2"
. Las claves y los valores se convierten en cadenas.
Los esquemas de struct y map tienen algunos comportamientos adicionales:
De manera opcional, puedes establecer la propiedad de configuración
messageBodyName
para especificar un campo de struct o una clave de asignación en particular como el cuerpo del mensaje. El valor del campo o la clave se almacena como unByteString
en el cuerpo del mensaje. Si no configurasmessageBodyName
, el cuerpo del mensaje estará vacío para los esquemas de struct y map.Para los valores de array, el conector solo admite tipos de array primitivos. La secuencia de valores del arreglo se concatena en un solo objeto
ByteString
.
Conversión de Pub/Sub a Kafka
El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:
Clave de registro de Kafka: De forma predeterminada, la clave se establece en
null
. De manera opcional, puedes especificar un atributo de mensaje de Pub/Sub para usar como clave si estableces la opción de configuraciónkafka.key.attribute
. En ese caso, el conector busca un atributo con ese nombre y establece la clave de registro en el valor del atributo. Si el atributo especificado no está presente, la clave de registro se establece ennull
.Valor del registro Kafka. El conector escribe el valor del registro de la siguiente manera:
Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro Kafka como un tipo
byte[]
mediante el conversor especificado porvalue.converter
.Si el mensaje de Pub/Sub tiene atributos personalizados y
kafka.record.headers
esfalse
, el conector escribe una struct en el valor del registro. El struct contiene un campo para cada atributo y un campo llamado"message"
cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
En este caso, debes usar un
value.converter
que sea compatible con esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Si el mensaje de Pub/Sub tiene atributos personalizados y
kafka.record.headers
estrue
, el conector escribe los atributos como encabezados del registro de Kafka. Escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro Kafka como un tipobyte[]
mediante el convertidor que especificavalue.converter
.
Encabezados de registro de Kafka. De forma predeterminada, los encabezados están vacíos, a menos que establezcas
kafka.record.headers
comotrue
.
Opciones de configuración
Además de la configuración que proporciona la API de Kafka Connect, el conector de Kafka de grupos de Pub/Sub admite las siguientes opciones de configuración.
Opciones de configuración del conector de receptores
El conector del receptor admite las siguientes opciones de configuración.
Parámetro de configuración | Tipo de datos | Descripción |
---|---|---|
connector.class |
String |
Obligatorio. La clase Java para el conector. Para
el conector de receptores de Pub/Sub, el valor debe ser
com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
El extremo de Pub/Sub que se usará. Valor predeterminado: |
cps.project |
String |
Obligatorio. El Google Cloud que contiene el tema de Pub/Sub. |
cps.topic |
String |
Obligatorio. Es el tema de Pub/Sub en el que se publican los registros de Kafka. |
gcp.credentials.file.path |
String |
Opcional. La ruta a un archivo que almacena las credenciales de Google Cloud para autenticar Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Un BLOB JSON que contiene Google Cloud para autenticar Pub/Sub Lite. |
headers.publish |
Boolean |
Cuando sea Valor predeterminado: |
maxBufferBytes |
Long |
La cantidad máxima de bytes que se deben recibir en una partición de Kafka de tema antes de publicarlos en Pub/Sub. Valor predeterminado: 10,000,000. |
maxBufferSize |
Integer |
La cantidad máxima de registros que se reciben en una partición de tema de Kafka antes de publicarlos en Pub/Sub. Predeterminado: 100. |
maxDelayThresholdMs |
Integer |
La cantidad máxima de tiempo que se espera para llegar a Predeterminado: 100. |
maxOutstandingMessages |
Long |
La cantidad máxima de registros que pueden estar pendientes, incluidos los lotes incompletos y pendientes, antes de que el publicador bloquee la publicación adicional. Valor predeterminado: |
maxOutstandingRequestBytes |
Long |
La cantidad máxima de bytes que pueden estar pendientes, incluidos los lotes incompletos y pendientes, antes de que el publicador bloquee la publicación adicional. Valor predeterminado: |
maxRequestTimeoutMs |
Integer |
Es el tiempo de espera para solicitudes de publicación individuales en Pub/Sub, en milisegundos. Valor predeterminado: 10,000. |
maxTotalTimeoutMs |
Integer |
Es el tiempo de espera total, en milisegundos, de una llamada para publicar en Pub/Sub, incluidos los reintentos. Valor predeterminado: 60,000. |
metadata.publish |
Boolean |
Cuando sea Valor predeterminado: |
messageBodyName |
String |
Cuando se usa un struct o un esquema de valor de asignación, especifica el nombre de un campo o clave para usar como cuerpo del mensaje de Pub/Sub. Consulta Conversión de Kafka a Pub/Sub. Valor predeterminado: |
orderingKeySource |
String |
Especifica cómo configurar la clave de ordenamiento en el mensaje de Pub/Sub. Puede ser uno de los siguientes valores:
Valor predeterminado: |
topics |
String |
Obligatorio. Una lista separada por comas de temas de Kafka desde los que se debe leer. |
Opciones de configuración del conector de origen
El conector de origen admite las siguientes opciones de configuración.
Parámetro de configuración | Tipo de datos | Descripción |
---|---|---|
connector.class |
String |
Obligatorio. La clase Java para el conector. Para el conector de origen de Pub/Sub, el valor debe ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
El extremo de Pub/Sub que se usará. Valor predeterminado: |
cps.makeOrderingKeyAttribute |
Boolean |
Cuando sea Valor predeterminado: |
cps.maxBatchSize |
Integer |
La cantidad máxima de mensajes por lotes por solicitud de extracción a Pub/Sub. Valor predeterminado: 100 |
cps.project |
String |
Obligatorio. El proyecto de Google Cloud que contiene el tema de Pub/Sub. |
cps.subscription |
String |
Obligatorio. El nombre de la suscripción de Pub/Sub desde la que se extraen los mensajes. |
gcp.credentials.file.path |
String |
Opcional. La ruta a un archivo que almacena las credenciales de Google Cloud para autenticar Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Un BLOB JSON que contiene Google Cloud para autenticar Pub/Sub Lite. |
kafka.key.attribute |
String |
El atributo de mensaje de Pub/Sub que se usa como clave para los mensajes publicados en Kafka. Si se configura en Valor predeterminado: |
kafka.partition.count |
Integer |
Es la cantidad de particiones Kafka para el tema de Kafka en el que se publican los mensajes. Este parámetro se ignora si el esquema de partición es Cantidad predeterminada: 1 |
kafka.partition.scheme |
String |
Es el esquema para asignar un mensaje a una partición en Kafka. Puede tener uno de los siguientes valores:
Valor predeterminado: |
kafka.record.headers |
Boolean |
Si es |
kafka.topic |
String |
Obligatorio. El tema de Kafka que recibe mensajes de Pub/Sub. |
Obtén asistencia
Si necesitas ayuda, crea un ticket de asistencia. Si tienes preguntas y debates generales, crea un problema en el repositorio de GitHub.
¿Qué sigue?
- Comprende las diferencias entre Kafka y Pub/Sub.
- Obtén más información sobre el conector de Kafka de grupos de Pub/Sub.
- Consulta el repositorio de GitHub del conector de Kafka de Pub/Sub.