Conectar Pub/Sub a Apache Kafka

Organiza tus páginas con colecciones Guarda y categoriza el contenido según tus preferencias.

En este documento, se describe cómo integrar Apache Kafka y Pub/Sub mediante el conector de Kafka de grupo de Pub/Sub.

Acerca del conector de Kafka para grupos de Pub/Sub

Apache Kafka es una plataforma de código abierto para eventos de transmisión. Por lo general, se usa en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Pub/Sub es un servicio administrado para enviar y recibir mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para comunicarte entre los componentes de la arquitectura de la nube.

El conector de Kafka para grupos de Pub/Sub le permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el JAR de conectores:

  • El conector receptor lee los registros de uno o más temas de Kafka y los publica en Pub/Sub.
  • El conector de origen lee los mensajes desde un tema de Pub/Sub y los publica en Kafka.

Estas son algunas situaciones en las que puedes usar el conector Kafka de Pub/Sub:

  • Migrará una arquitectura basada en Kafka a Google Cloud.
  • Tienes un sistema de frontend que almacena eventos en Kafka fuera de Google Cloud, pero también usas Google Cloud para ejecutar algunos de los servicios de backend, que deben recibir los eventos de Kafka.
  • Recopilas registros de una solución de Kafka local y los envías a Google Cloud para obtener estadísticas de datos.
  • Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos locales con Kafka.

El conector requiere Kafka Connect, que es un marco de trabajo para transmitir datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.

En este documento, se supone que estás familiarizado con Kafka y Pub/Sub. Antes de leer este documento, es una buena idea completar una de las guías de inicio rápido de Pub/Sub.

Comienza a usar el conector

En esta sección, se te guiará por las siguientes tareas:

  1. Configurar el conector de Kafka de grupo de Pub/Sub
  2. Enviar eventos de Kafka a Pub/Sub
  3. Enviar mensajes de Pub/Sub a Kafka

Requisitos previos

Instalar Kafka

Sigue la guía de inicio rápido de Apache Kafka para instalar un Kafka de un solo nodo en tu máquina local. Completa estos pasos en la guía de inicio rápido:

  1. Descargue la versión más reciente de Kafka y extráela.
  2. Inicie el entorno de Kafka.
  3. Cree un tema de Kafka.

Autenticar

El conector de Kafka de Pub/Sub debe autenticarse con Pub/Sub para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, sigue estos pasos:

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Cloud:

      gcloud projects create PROJECT_ID
    • Selecciona el proyecto de Cloud que creaste:

      gcloud config set project PROJECT_ID
  5. Crea credenciales de autenticación para tu Cuenta de Google:

    gcloud auth application-default login
  6. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.
  7. Instala Google Cloud CLI.
  8. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  9. Crea o selecciona un proyecto de Google Cloud.

    • Crea un proyecto de Cloud:

      gcloud projects create PROJECT_ID
    • Selecciona el proyecto de Cloud que creaste:

      gcloud config set project PROJECT_ID
  10. Crea credenciales de autenticación para tu Cuenta de Google:

    gcloud auth application-default login
  11. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.

Descarga el conector JAR

Descarga el archivo JAR del conector a tu máquina local. Para obtener más información, consulta Adquiere el conector en el archivo readme de GitHub.

Copia los archivos de configuración del conector

  1. Clona o descarga el repositorio de GitHub para el conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia el contenido del directorio config en el subdirectorio config de la instalación de Kafka.

    cp config/* [path to Kafka installation]/config/
    

Estos archivos contienen configuraciones para el conector.

Actualice su configuración de Kafka Connect

  1. Navegue al directorio de Kafka.
  2. Abre el archivo llamado config/connect-standalone.properties en un editor de texto.
  3. Si se agrega un comentario al plugin.path property, quita el comentario.
  4. Actualiza plugin.path property para incluir la ruta al JAR del conector.

    Ejemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Configura la propiedad offset.storage.file.filename con un nombre de archivo local. En el modo independiente, Kafka usa este archivo para almacenar datos de desplazamiento.

    Ejemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Reenviar eventos de Kafka a Pub/Sub

En esta sección, se describe cómo iniciar el conector del receptor, publicar eventos en Kafka y, luego, leer los mensajes reenviados desde Pub/Sub.

  1. Usa Google Cloud CLI para crear un tema de Pub/Sub con una suscripción.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub para recibir los mensajes de Kafka.
    • PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción a Pub/Sub para el tema.
  2. Abre el archivo /config/cps-sink-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en los comentarios:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • KAFKA_TOPICS: Una lista separada por comas de temas de Kafka para leer.
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene el tema de Pub/Sub.
    • PUBSUB_TOPIC: Es el tema de Pub/Sub para recibir los mensajes de Kafka.
  3. Desde el directorio de Kafka, ejecute el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Sigue los pasos de la guía de inicio rápido de Apache Kafka para escribir algunos eventos en tu tema de Kafka.

  5. Usa la CLI de gcloud para leer los eventos de Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Reenviar mensajes de Pub/Sub a Kafka

En esta sección, se describe cómo iniciar el conector de origen, publicar mensajes en Pub/Sub y leer los mensajes reenviados desde Kafka.

  1. Usa la CLI de gcloud para crear un tema de Pub/Sub con una suscripción.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub.
    • PUBSUB_SUBSCRIPTION: Es el nombre de una suscripción a Pub/Sub.
  2. Abre el archivo llamado /config/cps-source-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en los comentarios:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Reemplaza lo siguiente:

    • KAFKA_TOPIC: los temas de Kafka para recibir los mensajes de Pub/Sub
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene el tema de Pub/Sub.
    • PUBSUB_TOPIC: Es el tema de Pub/Sub.
  3. Desde el directorio de Kafka, ejecute el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Usa la CLI de gcloud para publicar un mensaje en Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Lea el mensaje de Kafka. Sigue los pasos de la guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.

Conversión de mensajes

Un registro de Kafka contiene una clave y un valor, que son arreglos de bytes de longitud variable. De manera opcional, un registro Kaafka también puede tener encabezados, que son pares clave-valor. Un mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos de clave-valor.

Kafka Connect usa convertidores para serializar claves y valores hacia y desde Kafka. Para controlar la serialización, configura las siguientes propiedades en los archivos de configuración del conector:

  • key.converter: El conversor que se usa para serializar claves de registro.
  • value.converter: El conversor que se usa para serializar valores de registro.

El cuerpo de un mensaje de Pub/Sub es un objeto ByteString, por lo que la conversión más eficiente es copiar la carga útil directamente. Por esa razón, recomendamos usar un conversor que produzca tipos de datos primitivos (números enteros, flotantes, de strings o de bytes) siempre que sea posible, para evitar la deserialización y la reserialización del mismo cuerpo del mensaje.

Conversión de Kafka a Pub/Sub

El conector receptor convierte los registros de Kafka en mensajes de Pub/Sub de la siguiente manera:

  • La clave de registro de Kafka se almacena como un atributo llamado "key" en el mensaje de Pub/Sub.
  • De forma predeterminada, el conector descarta los encabezados en el registro de Kafka. Sin embargo, si configuras la opción de configuración headers.publish como true, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite cualquier encabezado que supere los límites de atributos de mensaje de Pub/Sub.
  • En los esquemas de número entero, flotante, de string y de bytes, el conector pasa los bytes del valor del registro de Kafka directamente al cuerpo del mensaje de Pub/Sub.
  • Para los esquemas de struct, el conector escribe cada campo como un atributo del mensaje de Pub/Sub. Por ejemplo, si el campo es { "id"=123 }, el mensaje de Pub/Sub resultante tendrá un atributo "id"="123". El valor del campo siempre se convierte en una string.
  • Para los esquemas de mapa, el conector escribe cada par clave-valor como un atributo del mensaje de Pub/Sub. Por ejemplo, si el mapa es {"alice"=1,"bob"=2}, el mensaje de Pub/Sub resultante tendrá dos atributos: "alice"="1" y "bob"="2". Las claves y los valores se convierten en strings.

Los esquemas de estructuras y mapas tienen algunos comportamientos adicionales:

  • De manera opcional, puedes especificar un campo de struct o una clave de mapa en particular para que sea el cuerpo del mensaje si configuras la propiedad de configuración messageBodyName. El valor del campo o la clave se almacena como un ByteString en el cuerpo del mensaje. Si no configuras messageBodyName, el cuerpo del mensaje estará vacío para los esquemas de struct y mapa.

  • Para los valores de arreglo, el conector solo admite tipos de arreglo primitivos. La secuencia de valores del arreglo se concatena en un solo objeto ByteString.

Conversión de Pub/Sub a Kafka

El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:

  • Clave de registro de Kafka: de forma predeterminada, la clave se establece en null. De manera opcional, puedes especificar un atributo de mensaje de Pub/Sub para usar como clave si configuras la opción de configuración kafka.key.attribute. En ese caso, el conector busca un atributo con ese nombre y establece la clave de registro en el valor del atributo. Si el atributo especificado no está presente, la clave de registro se establece en null.

  • Valor de registro de Kafka. El conector escribe el valor de registro de la siguiente manera:

    • Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector escribe el cuerpo del mensaje de Pub/Sub directamente en el valor de registro de Kafka como un tipo byte[] mediante el conversor especificado por value.converter.

    • Si el mensaje de Pub/Sub tiene atributos personalizados y kafka.record.headers es false, el conector escribe una estructura en el valor del registro. El struct contiene un campo para cada atributo y un campo llamado "message" cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      En este caso, debes usar un value.converter que sea compatible con los esquemas de struct, como org.apache.kafka.connect.json.JsonConverter.

    • Si el mensaje de Pub/Sub tiene atributos personalizados y kafka.record.headers es true, el conector escribe los atributos como encabezados de registro de Kafka. Escribe el cuerpo del mensaje de Pub/Sub directamente en el valor de registro de Kafka como un tipo byte[], mediante el conversor especificado por value.converter.

  • Encabezados de registro de Kafka. De forma predeterminada, los encabezados están vacíos, a menos que establezcas kafka.record.headers en true.

Opciones de configuración

Además de las configuraciones que proporciona la API de Kafka Connect, el conector de Kafka de Pub/Sub admite las siguientes configuraciones.

Opciones de configuración del conector del receptor

El conector del receptor admite las siguientes opciones de configuración.

Parámetro de configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector del receptor de Pub/Sub, el valor debe ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

El extremo de Pub/Sub que se usará.

Valor predeterminado: "pubsub.googleapis.com:443".

cps.project String Obligatorio. El Google Cloud que contiene el tema de Pub/Sub.
cps.topic String Obligatorio. El tema de Pub/Sub en el que se publican los registros de Kafka
gcp.credentials.file.path String Opcional. La ruta a un archivo que almacena las credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para autenticar Pub/Sub Lite
headers.publish Boolean

Cuando sea true, incluye cualquier encabezado de registro de Kafka como atributos de mensaje de Pub/Sub.

Valor predeterminado: false.

maxBufferBytes Long

La cantidad máxima de bytes que se recibirán en una partición de tema de Kafka antes de publicarlos en Pub/Sub.

Predeterminado: 10000000.

maxBufferSize Integer

La cantidad máxima de registros que se recibirán en una partición del tema de Kafka antes de publicarlos en Pub/Sub.

Valor predeterminado: 100.

maxDelayThresholdMs Integer

La cantidad máxima de tiempo que se debe esperar para alcanzar maxBufferSize o maxBufferBytes antes de publicar registros pendientes en Pub/Sub, en milisegundos.

Valor predeterminado: 100.

maxOutstandingMessages Long

La cantidad máxima de registros que pueden estar pendientes, incluidos los lotes incompletos y pendientes, antes de que el editor bloquee la publicación.

Valor predeterminado: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

La cantidad máxima de bytes totales que pueden estar pendientes, incluidos los lotes incompletos y pendientes, antes de que el editor bloquee la publicación.

Valor predeterminado: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

El tiempo de espera, en milisegundos, de solicitudes de publicación individuales en Pub/Sub.

Valor predeterminado: 10,000.

maxTotalTimeoutMs Integer

El tiempo de espera total, en milisegundos, para que una llamada se publique en Pub/Sub, incluidos los reintentos.

Valor predeterminado: 60,000.

metadata.publish Boolean

Cuando sea true, incluye el tema, la partición, la compensación y la marca de tiempo de Kafka como atributos de mensaje de Pub/Sub.

Valor predeterminado: false.

messageBodyName String

Cuando se usa un esquema de valor de struct o mapa, se especifica el nombre de un campo o una clave que se usará como cuerpo del mensaje de Pub/Sub. Consulta Conversión de Kafka a Pub/Sub.

Valor predeterminado: "cps_message_body".

orderingKeySource String

Especifica cómo establecer la clave de ordenamiento en el mensaje de Pub/Sub. Puede ser uno de los siguientes valores:

  • none: No establezcas la clave de ordenamiento.
  • key: Usa la clave de registro de Kafka como clave de ordenamiento.
  • partition: Usa el número de partición, convertido en una string, como la clave de ordenamiento. Usa esta configuración solo para temas de capacidad de procesamiento baja o con miles de particiones.

Valor predeterminado: none.

topics String Obligatorio. Una lista separada por comas de temas de Kafka para leer.

Opciones de configuración del conector de origen

El conector de origen admite las siguientes opciones de configuración.

Parámetro de configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector de origen de Pub/Sub, el valor debe ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

El extremo de Pub/Sub que se usará.

Valor predeterminado: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Cuando sea true, escribe la clave de ordenamiento en el registro de Kafka con el mismo formato que los atributos de mensaje de Pub/Sub. Consulta Conversión de registros de Pub/Sub a Kafka.

Valor predeterminado: false.

cps.maxBatchSize Integer

La cantidad máxima de mensajes en lotes por solicitud de extracción a Pub/Sub.

Valor predeterminado: 100

cps.project String Obligatorio. El proyecto de Google Cloud que contiene el tema de Pub/Sub.
cps.subscription String Obligatorio. El nombre de la suscripción a Pub/Sub desde la que se extraen los mensajes.
gcp.credentials.file.path String Opcional. La ruta a un archivo que almacena las credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para autenticar Pub/Sub Lite
kafka.key.attribute String

El atributo de mensaje de Pub/Sub que se usará como clave para los mensajes publicados en Kafka. Si se configura como "orderingKey", usa la clave de ordenamiento del mensaje. Si es null, los registros de Kafka no tienen una clave.

Valor predeterminado: null.

kafka.partition.count Integer

La cantidad de particiones de Kafka para el tema de Kafka en el que se publican los mensajes. Este parámetro se ignora si el esquema de partición es "kafka_partitioner".

Valor predeterminado: 1.

kafka.partition.scheme String

El esquema para asignar un mensaje a una partición en Kafka Puede ser uno de los siguientes valores:

  • round_robin: Asigna particiones de manera rotativa.
  • hash_key: Encuentra la partición con la codificación hash de la clave de registro.
  • hash_value: Encuentra la partición con un hash del valor del registro.
  • kafka_partitioner: Delega la lógica de partición al productor de Kafka. De forma predeterminada, el productor Kafka detecta de forma automática la cantidad de particiones y realiza una asignación de partición basada en hash murmullo o un round robin, según se proporcione una clave de registro.
  • ordering_key: Usa el código hash de la clave de ordenamiento de un mensaje. Si no hay una clave de ordenamiento presente, usa round_robin.

Valor predeterminado: round_robin.

kafka.record.headers Boolean

Si es true, escribe los atributos del mensaje de Pub/Sub como encabezados de Kafka.

kafka.topic String Obligatorio. El tema Kafka que recibe mensajes de Pub/Sub

¿Qué sigue?