Conecta Pub/Sub a Apache Kafka

En este documento, se describe cómo integrar Apache Kafka y Pub/Sub mediante con el conector de Kafka del grupo de Pub/Sub.

Acerca del conector de Kafka del grupo de Pub/Sub

Apache Kafka es una plataforma de código abierto para la transmisión de eventos. Por lo general, usadas en arquitecturas distribuidas para permitir la comunicación entre componentes acoplados. Pub/Sub es un servicio administrado para enviar y reciben mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub para la comunicación entre componentes de tu nube arquitectura.

El conector de Kafka del grupo de Pub/Sub te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el JAR del conector:

  • El conector receptor lee los registros de uno o más temas de Kafka y las publica en Pub/Sub.
  • El conector de origen lee los mensajes de un tema de Pub/Sub. y las publica en Kafka.

Estas son algunas situaciones en las que puedes usar el conector de Kafka del grupo Pub/Sub:

  • Migrarás una arquitectura basada en Kafka a Google Cloud.
  • Tienes un sistema de frontend que almacena eventos en Kafka fuera de Google Cloud, pero también usas Google Cloud para ejecutar parte de tu servicios que necesitan recibir los eventos de Kafka.
  • Recopilas registros de una solución de Kafka local y los envías a Google Cloud para el análisis de datos.
  • Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos de forma local con Kafka.

El conector requiere Kafka Connect, que es un framework para la transmisión de datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.

En este documento, se supone que conoces Kafka y Pub/Sub Antes de leer este documento, te recomendamos completa una de las Guías de inicio rápido de Pub/Sub.

El conector de Pub/Sub no admite ninguna integración entre las LCA de Google Cloud IAM y Kafka Connect.

Comienza a usar el conector

En esta sección, se explican las siguientes tareas:

  1. Configura el conector de Kafka del grupo de Pub/Sub.
  2. Enviar eventos de Kafka a Pub/Sub
  3. Enviar mensajes de Pub/Sub a Kafka

Requisitos previos

Instala Kafka

Sigue el Guía de inicio rápido de Apache Kafka instalar un Kafka de nodo único en tu máquina local. Completa estos pasos en la guía de inicio rápido:

  1. Descarga la versión más reciente de Kafka y extráela.
  2. Inicie el entorno de Kafka.
  3. Crear un tema de Kafka

Autenticar

El conector de Kafka del grupo de Pub/Sub debe autenticarse con Pub/Sub para para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, realiza los siguientes pasos:

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  6. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.
  7. Instala Google Cloud CLI.
  8. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  11. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.

Descarga el conector JAR

Descarga el archivo JAR del conector a tu máquina local. Para obtener más información, consulta Adquiere el conector en el archivo readme de GitHub.

Copia los archivos de configuración del conector

  1. Clona o descarga la Repositorio de GitHub para el conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia el contenido del directorio config en el subdirectorio config de la instalación de Kafka.

    cp config/* [path to Kafka installation]/config/
    

Estos archivos contienen los parámetros de configuración del conector.

Actualiza la configuración de Kafka Connect

  1. Navega al directorio que contiene el objeto binario de Kafka Connect que descargado.
  2. En el directorio binario Kafka Connect, abre el archivo llamado config/connect-standalone.properties en un editor de texto.
  3. Si el elemento plugin.path property está marcado como comentario, quítalo.
  4. Actualiza plugin.path property para incluir la ruta de acceso al conector JAR.

    Ejemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Configura la propiedad offset.storage.file.filename con un nombre de archivo local. En en modo independiente, Kafka usa este archivo para almacenar datos desplazados.

    Ejemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Reenvía eventos de Kafka a Pub/Sub

En esta sección, se describe cómo iniciar el conector del receptor, publicar eventos en Kafka, y, luego, leer los mensajes reenviados desde Pub/Sub.

  1. Usa la Google Cloud CLI para crear un tema de Pub/Sub con una suscripción.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub que se va a usar. recibir los mensajes de Kafka.
    • PUBSUB_SUBSCRIPTION: Es el nombre de un Pub/Sub. suscripción del tema.
  2. Abre el archivo /config/cps-sink-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en el comentarios:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • KAFKA_TOPICS: Una lista separada por comas de temas de Kafka para leer de la imagen de la que se originó.
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub.
    • PUBSUB_TOPIC: El tema de Pub/Sub para recibir la mensajes de Kafka.
  3. En el directorio de Kafka, ejecuta el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Sigue los pasos que se indican Guía de inicio rápido de Apache Kafka para escribir algunos eventos en su tema de Kafka.

  5. Usa gcloud CLI para leer los eventos desde Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Reenviar mensajes de Pub/Sub a Kafka

En esta sección, se describe cómo iniciar el conector de origen y publicar mensajes en Pub/Sub y leer los mensajes reenviados desde Kafka.

  1. Usa gcloud CLI para crear un tema de Pub/Sub con una suscripción.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Reemplaza lo siguiente:

    • PUBSUB_TOPIC: Es el nombre de un tema de Pub/Sub.
    • PUBSUB_SUBSCRIPTION: Es el nombre de un Pub/Sub. suscripción.
  2. Abre el archivo llamado /config/cps-source-connector.properties en un texto Editor. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en los comentarios:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Reemplaza lo siguiente:

    • KAFKA_TOPIC: Son los temas de Kafka que recibirán la mensajes de Pub/Sub.
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub.
    • PUBSUB_TOPIC: Es el tema de Pub/Sub.
  3. En el directorio de Kafka, ejecuta el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Usa gcloud CLI para publicar un mensaje en Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leer el mensaje de Kafka Sigue los pasos que se indican Guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.

Conversión de mensajes

Un registro Kafka contiene una clave y un valor, que son arrays de bytes de longitud variable. De forma opcional, un El registro Kafka también puede tener encabezados, que son pares clave-valor. R Mensaje de Pub/Sub tiene dos partes principales: el cuerpo del mensaje y cero o más atributos de par clave-valor.

Kafka Connect utiliza convertidores para serializar claves y valores desde y hacia Kafka. Para controlar la serialización, establece las siguientes propiedades en el conector de Terraform:

  • key.converter: El conversor que se usa para serializar claves de registro.
  • value.converter: El conversor que se usa para serializar valores de registro.

El cuerpo de un mensaje de Pub/Sub es un objeto ByteString, por lo que la conversión más eficiente es copiar la carga útil directamente. Por esa razón, se recomienda usar un conversor que produzca tipos de datos primitivos (número entero, número de punto flotante, o esquema de bytes) cuando sea posible para evitar la deserialización y a volver a serializar el mismo cuerpo de mensaje.

Conversión de Kafka a Pub/Sub

El conector del receptor convierte los registros de Kafka en mensajes de Pub/Sub, como sigue:

  • La clave de registro Kafka se almacena como un atributo llamado "key" en el mensaje de Pub/Sub.
  • De forma predeterminada, el conector descarta todos los encabezados del registro de Kafka. Sin embargo, si estableciste la opción de configuración headers.publish en true, el conector escribe los encabezados como atributos de Pub/Sub. El conector omite cualquier encabezado que exceda Pub/Sub límites sobre los atributos de los mensajes.
  • Para los esquemas de número entero, de número de punto flotante, de cadena y de bytes, el conector pasa los bytes del valor del registro Kafka en el mensaje de Pub/Sub cuerpo.
  • Para esquemas de struct, el conector escribe cada campo como un atributo del mensaje de Pub/Sub. Por ejemplo, si el campo es { "id"=123 }, el mensaje de Pub/Sub resultante tiene un atributo "id"="123". El el valor de este campo siempre se convierte en una cadena. Los tipos Map y struct no son se admiten como tipos de campo en un struct.
  • Para esquemas de mapas, el conector escribe cada par clave-valor como un atributo de el mensaje de Pub/Sub. Por ejemplo, si el mapa está {"alice"=1,"bob"=2}, el mensaje de Pub/Sub resultante tiene dos atributos, "alice"="1" y "bob"="2". Las claves y los valores se convierten en cadenas.

Los esquemas struct y map tienen algunos comportamientos adicionales:

  • También puedes especificar un campo de struct o clave de mapa en particular para que sea la del mensaje. Para ello, establece la propiedad de configuración messageBodyName. El del campo o la clave se almacena como un ByteString en el cuerpo del mensaje. Si Si no estableces messageBodyName, el cuerpo del mensaje estará vacío para los structs y esquemas de mapas.

  • Para los valores de array, el conector solo admite tipos primitivos de array. El de valores del array se concatena en un solo elemento ByteString .

Conversión de Pub/Sub a Kafka

El conector de origen convierte los mensajes de Pub/Sub en registros de Kafka de la siguiente manera:

  • Clave de registro Kafka: De forma predeterminada, la clave se establece en null. De manera opcional, puede especificar un atributo de mensaje de Pub/Sub para usar como clave, estableciendo la opción de configuración kafka.key.attribute En ese caso, el el conector busca un atributo con ese nombre y establece la clave de registro en valor del atributo. Si el atributo especificado no está presente, la clave de registro se se establece en null.

  • Valor del registro Kafka. El conector escribe el valor del registro de la siguiente manera:

    • Si el mensaje de Pub/Sub no tiene atributos personalizados, el conector Escribe el cuerpo del mensaje de Pub/Sub directamente en el registro de Kafka. valor como un tipo byte[], con el conversor especificado por value.converter

    • Si el mensaje de Pub/Sub tiene atributos personalizados kafka.record.headers es false, el conector escribe un struct en el valor de registro. El struct contiene un campo para cada atributo y un campo llamado "message", cuyo valor es el cuerpo del mensaje de Pub/Sub (almacenado como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      En este caso, debes usar un value.converter que sea compatible con Esquemas struct, como org.apache.kafka.connect.json.JsonConverter.

    • Si el mensaje de Pub/Sub tiene atributos personalizados kafka.record.headers es true, el conector escribe los atributos como Encabezados del registro Kafka Escribe el cuerpo del mensaje de Pub/Sub directamente al valor del registro Kafka como un tipo byte[] mediante el conversor especificadas por value.converter.

  • Encabezados del registro Kafka. De forma predeterminada, los encabezados están vacíos, a menos que establezcas De kafka.record.headers a true.

Opciones de configuración

Además de los parámetros de configuración proporcionados por la API de Kafka Connect, el El conector de Kafka del grupo de Pub/Sub admite los siguientes parámetros de configuración.

Opciones de configuración del conector del receptor

El conector del receptor admite las siguientes opciones de configuración.

Configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector del receptor de Pub/Sub, el valor debe ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

El extremo de Pub/Sub que se usará.

Valor predeterminado: "pubsub.googleapis.com:443".

cps.project String Obligatorio. El servicio de Google Cloud que contiene Tema de Pub/Sub.
cps.topic String Obligatorio. El tema de Pub/Sub que se publicará a todos los registros de Kafka.
gcp.credentials.file.path String Opcional. La ruta de acceso a un archivo que almacena credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para la autenticación de Pub/Sub Lite.
headers.publish Boolean

Cuando sea true, incluye cualquier encabezado de registro Kafka como Atributos de mensaje de Pub/Sub.

Valor predeterminado: false.

maxBufferBytes Long

La cantidad máxima de bytes que se recibirán en una partición de Kafka de tema antes de publicarlos en Pub/Sub.

Valor predeterminado: 10,000,000.

maxBufferSize Integer

La cantidad máxima de registros que se reciben en una partición de tema de Kafka antes de publicarlos en Pub/Sub.

Valor predeterminado: 100.

maxDelayThresholdMs Integer

La cantidad máxima de tiempo que se debe esperar para alcanzar maxBufferSize o maxBufferBytes antes de la publicación registros pendientes a Pub/Sub en milisegundos.

Valor predeterminado: 100.

maxOutstandingMessages Long

La cantidad máxima de registros que pueden estar pendientes, incluidos lotes incompletos y pendientes, antes de que el publicador siga bloqueando publicación.

Valor predeterminado: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

La cantidad máxima de bytes totales que pueden estar pendientes, incluidas lotes incompletos y pendientes, antes de que el publicador siga bloqueando publicación.

Valor predeterminado: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

El tiempo de espera para las solicitudes de publicación individuales a Pub/Sub, en milisegundos.

Valor predeterminado: 10,000.

maxTotalTimeoutMs Integer

Es el tiempo de espera total, en milisegundos, para que una llamada publique en Pub/Sub, incluidos los reintentos.

Valor predeterminado: 60,000.

metadata.publish Boolean

Cuando sea true, incluye el tema, la partición, el desplazamiento y la marca de tiempo como atributos de mensajes de Pub/Sub.

Valor predeterminado: false.

messageBodyName String

Cuando se usa un esquema de struct o map value, especifica el nombre de un o la clave que se usará como el cuerpo del mensaje de Pub/Sub. Consulta Conversión de Kafka a Pub/Sub.

Valor predeterminado: "cps_message_body".

orderingKeySource String

Especifica cómo configurar la clave de ordenamiento en Pub/Sub mensaje. Puede ser uno de los siguientes valores:

  • none: No establece la clave de ordenamiento.
  • key: Usa la clave de registro de Kafka como clave de ordenamiento.
  • partition: Usa el número de partición, convertido en un como la clave de ordenamiento. Usa este parámetro de configuración solo para capacidades de capacidad de procesamiento bajas o temas con miles de particiones.

Valor predeterminado: none.

topics String Obligatorio. Lista separada por comas de temas de Kafka leer.

Opciones de configuración del conector de origen

El conector de origen admite las siguientes opciones de configuración.

Configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector de origen de Pub/Sub, el valor debe ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

El extremo de Pub/Sub que se usará.

Valor predeterminado: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Cuando sea true, escribe la clave de ordenamiento en el registro Kafka. con el mismo formato que los atributos de mensajes de Pub/Sub. Consulta Conversión de Pub/Sub a registros de Kafka.

Valor predeterminado: false.

cps.maxBatchSize Integer

Cantidad máxima de mensajes por lote por solicitud de extracción a Pub/Sub

Valor predeterminado: 100

cps.project String Obligatorio. El proyecto de Google Cloud que contiene Tema de Pub/Sub.
cps.subscription String Obligatorio. El nombre de Pub/Sub de la que se extraen los mensajes.
gcp.credentials.file.path String Opcional. La ruta de acceso a un archivo que almacena credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para la autenticación de Pub/Sub Lite.
kafka.key.attribute String

El atributo del mensaje de Pub/Sub que se usará como clave para mensajes publicados en Kafka. Si está establecido en "orderingKey", usa la clave de ordenamiento del mensaje. Si es null, los registros de Kafka no tienen un .

Valor predeterminado: null.

kafka.partition.count Integer

La cantidad de particiones de Kafka del tema de Kafka en las que los mensajes de que se publiquen los cambios. Este parámetro se ignora si el esquema de partición se "kafka_partitioner".

Cantidad predeterminada: 1

kafka.partition.scheme String

El esquema para asignar un mensaje a una partición en Kafka. Puede ser uno de los siguientes valores:

  • round_robin: Asigna particiones en un round robin. a la moda.
  • hash_key: Encuentra la partición mediante la generación de un hash del registro. .
  • hash_value: Encuentra la partición mediante la generación de un hash del registro. valor.
  • kafka_partitioner: Delega la lógica de partición al de Kafka de Kafka. De forma predeterminada, el productor de Kafka detecta automáticamente la cantidad de particiones y realiza particiones basadas en hash murmur mapeo o round robin, según si se proporciona una clave de registro.
  • ordering_key: Usa el código hash de un clave de ordenamiento. Si no hay una clave de ordenamiento, usa round_robin.

Valor predeterminado: round_robin.

kafka.record.headers Boolean

Si es true, escribe los atributos del mensaje de Pub/Sub como encabezados de Kafka.

kafka.topic String Obligatorio. El tema de Kafka que recibe mensajes de Pub/Sub

Obtén asistencia

Si necesitas ayuda, crea un ticket de asistencia. Para preguntas y debates generales, crea un problema en el repositorio de GitHub.

¿Qué sigue?