En este documento se describe cómo integrar Apache Kafka y Pub/Sub mediante el conector de Kafka de grupo de Pub/Sub.
Acerca del conector de Kafka de Pub/Sub Group
Apache Kafka es una plataforma de código abierto para el streaming de eventos. Se suele usar en arquitecturas distribuidas para permitir la comunicación entre componentes con bajo acoplamiento. Pub/Sub es un servicio gestionado para enviar y recibir mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para comunicarte entre los componentes de tu arquitectura en la nube.
El conector de Kafka de grupos de Pub/Sub te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el archivo JAR del conector:
- El conector de sumidero lee registros de uno o varios temas de Kafka y los publica en Pub/Sub.
- El conector de origen lee mensajes de un tema de Pub/Sub y los publica en Kafka.
Estos son algunos casos en los que puedes usar el conector de Kafka de Pub/Sub Group:
- Vas a migrar una arquitectura basada en Kafka a Google Cloud.
- Tienes un sistema frontend que almacena eventos en Kafka fuera deGoogle Cloud, pero también usas Google Cloud para ejecutar algunos de tus servicios backend, que necesitan recibir los eventos de Kafka.
- Recoges registros de una solución de Kafka local y los envías aGoogle Cloud para analizar los datos.
- Tienes un sistema frontend que usa Google Cloud, pero también almacenas datos en las instalaciones con Kafka.
El conector requiere Kafka Connect, que es un framework para transmitir datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.
En este documento se da por supuesto que conoces Kafka y Pub/Sub. Antes de leer este documento, te recomendamos que completes una de las guías de inicio rápido de Pub/Sub.
El conector Pub/Sub no admite ninguna integración entre las LCAs de Kafka Connect y IAM. Google Cloud
Empezar a usar el conector
En esta sección se explican las siguientes tareas:- Configura el conector de Kafka de Pub/Sub Group.
- Envía eventos de Kafka a Pub/Sub.
- Envía mensajes de Pub/Sub a Kafka.
Requisitos previos
Instalar Kafka
Sigue la guía de inicio rápido de Apache Kafka para instalar un nodo único de Kafka en tu máquina local. Sigue estos pasos en la guía de inicio rápido:
- Descarga la última versión de Kafka y extráela.
- Inicia el entorno de Kafka.
- Crea un tema de Kafka.
Autenticar
El conector de Kafka de Pub/Sub Group debe autenticarse con Pub/Sub para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, sigue estos pasos:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
-
Para inicializar gcloud CLI, ejecuta el siguiente comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Clona o descarga el repositorio de GitHub del conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia el contenido del directorio
config
en el subdirectorioconfig
de tu instalación de Kafka.cp config/* [path to Kafka installation]/config/
- Ve al directorio que contiene el archivo binario de Kafka Connect que has descargado.
- En el directorio binario de Kafka Connect, abre el archivo llamado
config/connect-standalone.properties
en un editor de texto. - Si el
plugin.path property
está comentado, quítale el comentario. Actualiza
plugin.path property
para incluir la ruta al archivo JAR del conector.Ejemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Asigna a la propiedad
offset.storage.file.filename
el nombre de un archivo local. En el modo independiente, Kafka usa este archivo para almacenar datos de desplazamiento.Ejemplo:
offset.storage.file.filename=/tmp/connect.offsets
Usa la CLI de Google Cloud para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Haz los cambios siguientes:
- PUBSUB_TOPIC: nombre de un tema de Pub/Sub para recibir los mensajes de Kafka.
- PUBSUB_SUBSCRIPTION: el nombre de una suscripción de Pub/Sub al tema.
Abre el archivo
/config/cps-sink-connector.properties
en un editor de texto. Añade valores a las siguientes propiedades, que están marcadas con"TODO"
en los comentarios:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Haz los cambios siguientes:
- KAFKA_TOPICS: lista de temas de Kafka separados por comas desde los que se leerá.
- PROJECT_ID: el proyecto que contiene tu tema de Pub/Sub. Google Cloud
- PUBSUB_TOPIC: el tema de Pub/Sub que recibirá los mensajes de Kafka.
En el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Sigue los pasos que se indican en la guía de inicio rápido de Apache Kafka para escribir algunos eventos en tu tema de Kafka.
Usa la CLI de gcloud para leer los eventos de Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Usa la CLI de gcloud para crear un tema de Pub/Sub con una suscripción.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Haz los cambios siguientes:
- PUBSUB_TOPIC: nombre de un tema de Pub/Sub.
- PUBSUB_SUBSCRIPTION: nombre de una suscripción de Pub/Sub.
Abre el archivo llamado
/config/cps-source-connector.properties
en un editor de texto. Añade valores a las siguientes propiedades, que están marcadas con"TODO"
en los comentarios:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Haz los cambios siguientes:
- KAFKA_TOPIC: los temas de Kafka en los que se recibirán los mensajes de Pub/Sub.
- PROJECT_ID: el proyecto que contiene tu tema de Pub/Sub. Google Cloud
- PUBSUB_TOPIC: el tema de Pub/Sub.
En el directorio de Kafka, ejecuta el siguiente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Usa la CLI de gcloud para publicar un mensaje en Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lee el mensaje de Kafka. Sigue los pasos de la guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.
key.converter
: el convertidor que se usa para serializar las claves de registro.value.converter
: el convertidor que se usa para serializar los valores de los registros.- La clave de registro de Kafka se almacena como un atributo llamado
"key"
en el mensaje de Pub/Sub. - De forma predeterminada, el conector elimina los encabezados del registro de Kafka. Sin embargo, si asignas el valor
true
a la opción de configuraciónheaders.publish
, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite los encabezados que superen los límites de atributos de mensaje de Pub/Sub. - En el caso de los esquemas de números enteros, números de coma flotante, cadenas y bytes, el conector transfiere los bytes del valor del registro de Kafka directamente al cuerpo del mensaje de Pub/Sub.
- En el caso de los esquemas de struct, el conector escribe cada campo como un atributo del mensaje de Pub/Sub. Por ejemplo, si el campo es
{ "id"=123 }
, el mensaje de Pub/Sub resultante tiene un atributo"id"="123"
. El valor del campo siempre se convierte en una cadena. Los tipos de mapa y struct no se admiten como tipos de campo en un struct. - En el caso de los esquemas de mapa, el conector escribe cada par clave-valor como un atributo del mensaje de Pub/Sub. Por ejemplo, si el mapa es
{"alice"=1,"bob"=2}
, el mensaje de Pub/Sub resultante tiene dos atributos:"alice"="1"
y"bob"="2"
. Las claves y los valores se convierten en cadenas. También puedes especificar un campo struct o una clave de mapa concretos para que sean el cuerpo del mensaje. Para ello, define la propiedad de configuración
messageBodyName
. El valor del campo o de la clave se almacena como unByteString
en el cuerpo del mensaje. Si no definesmessageBodyName
, el cuerpo del mensaje estará vacío en los esquemas de struct y map.En el caso de los valores de matriz, el conector solo admite tipos de matriz primitivos. La secuencia de valores de la matriz se concatena en un solo objeto
ByteString
.Clave de registro de Kafka: de forma predeterminada, la clave es
null
. También puedes especificar un atributo de mensaje de Pub/Sub para usarlo como clave. Para ello, define la opción de configuraciónkafka.key.attribute
. En ese caso, el conector busca un atributo con ese nombre y asigna el valor del atributo a la clave del registro. Si el atributo especificado no está presente, la clave de registro se define comonull
.Valor del registro de Kafka. El conector escribe el valor del registro de la siguiente manera:
Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro de Kafka como un tipo
byte[]
, mediante el convertidor especificado porvalue.converter
.Si el mensaje de Pub/Sub tiene atributos personalizados y
kafka.record.headers
esfalse
, el conector escribe un struct en el valor del registro. La estructura contiene un campo por cada atributo y un campo llamado"message"
cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
En este caso, debe usar un
value.converter
que sea compatible con los esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Si el mensaje de Pub/Sub tiene atributos personalizados y
kafka.record.headers
estrue
, el conector escribe los atributos como encabezados de registro de Kafka. Escribe el cuerpo del mensaje de Pub/Sub directamente en el valor del registro de Kafka como un tipobyte[]
, mediante el convertidor especificado porvalue.converter
.
Encabezados de registros de Kafka. De forma predeterminada, las cabeceras están vacías, a menos que asignes el valor
true
akafka.record.headers
.- Conocer las diferencias entre Kafka y Pub/Sub.
- Consulta más información sobre el conector de Kafka de grupos de Pub/Sub.
- Consulta el repositorio de GitHub de Pub/Sub Group Kafka Connector.
Descargar el archivo JAR del conector
Descarga el archivo JAR del conector en tu máquina local. Para obtener más información, consulta la sección Adquirir el conector del archivo readme de GitHub.
Copiar los archivos de configuración del conector
Estos archivos contienen ajustes de configuración para el conector.
Actualizar la configuración de Kafka Connect
Reenviar eventos de Kafka a Pub/Sub
En esta sección se describe cómo iniciar el conector de sumidero, publicar eventos en Kafka y, a continuación, leer los mensajes reenviados de Pub/Sub.
Reenviar mensajes de Pub/Sub a Kafka
En esta sección se describe cómo iniciar el conector de origen, publicar mensajes en Pub/Sub y leer los mensajes reenviados de Kafka.
Conversión de mensajes
Un registro de Kafka contiene una clave y un valor, que son matrices de bytes de longitud variable. De forma opcional, un registro de Kafka también puede tener encabezados, que son pares clave-valor. Un mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos de clave-valor.
Kafka Connect usa convertidores para serializar claves y valores hacia y desde Kafka. Para controlar la serialización, define las siguientes propiedades en los archivos de configuración del conector:
El cuerpo de un mensaje de Pub/Sub es un objeto ByteString
, por lo que la conversión más eficiente es copiar la carga útil directamente. Por este motivo, le recomendamos que utilice un convertidor que genere tipos de datos primitivos (esquema de números enteros, de números de coma flotante, de cadenas o de bytes) siempre que sea posible para evitar deserializar y volver a serializar el mismo cuerpo del mensaje.
Conversión de Kafka a Pub/Sub
El conector de sumidero convierte los registros de Kafka en mensajes de Pub/Sub de la siguiente manera:
Los esquemas de struct y de mapa tienen algunos comportamientos adicionales:
Conversión de Pub/Sub a Kafka
El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:
Opciones de configuración
Además de las configuraciones proporcionadas por la API Kafka Connect, el conector Kafka de grupo de Pub/Sub admite la configuración de receptor y de origen, tal como se describe en Configuraciones del conector de Pub/Sub.
Obtener asistencia
Si necesitas ayuda, crea una incidencia. Si tienes alguna pregunta o quieres debatir sobre algo, crea una incidencia en el repositorio de GitHub.