Conecta Pub/Sub Lite a Apache Kafka

En este documento, se describe cómo integrar Apache Kafka y Pub/Sub Lite mediante con el conector de Kafka del grupo de Pub/Sub.

Acerca del conector de Kafka del grupo de Pub/Sub

Apache Kafka es una plataforma de código abierto para la transmisión de eventos. Por lo general, usadas en arquitecturas distribuidas para permitir la comunicación entre componentes acoplados. Pub/Sub Lite es un servicio administrado para enviar y reciben mensajes de forma asíncrona. Al igual que con Kafka, puedes usar Pub/Sub Lite para la comunicación entre componentes de tu nube arquitectura.

El conector de Kafka del grupo de Pub/Sub te permite integrar estos dos sistemas. Los siguientes conectores se empaquetan en el JAR del conector:

  • El conector receptor lee los registros de uno o más temas de Kafka y y las publica en Pub/Sub Lite.
  • El conector de origen lee los mensajes de un tema de Pub/Sub Lite. y las publica en Kafka.

Estas son algunas situaciones en las que puedes usar el conector de Kafka del grupo Pub/Sub:

  • Migrarás una arquitectura basada en Kafka a Google Cloud.
  • Tienes un sistema de frontend que almacena eventos en Kafka fuera de Google Cloud, pero también usas Google Cloud para ejecutar parte de tu servicios que necesitan recibir los eventos de Kafka.
  • Recopilas registros de una solución de Kafka local y los envías a Google Cloud para el análisis de datos.
  • Tienes un sistema de frontend que usa Google Cloud, pero también almacenas datos de forma local con Kafka.

El conector requiere Kafka Connect, que es un framework para la transmisión de datos entre Kafka y otros sistemas. Para usar el conector, debes ejecutar Kafka Connect junto con tu clúster de Kafka.

En este documento, se supone que conoces Kafka y Pub/Sub Lite. Para comenzar a usar Pub/Sub Lite, consulta Publica y recibe mensajes en Pub/Sub Lite con la consola de Google Cloud.

Comienza a usar el conector de Kafka del grupo de Pub/Sub

En esta sección, se explican las siguientes tareas:

  1. Configura el conector de Kafka del grupo de Pub/Sub.
  2. Enviar eventos de Kafka a Pub/Sub Lite
  3. Enviar mensajes de Pub/Sub Lite a Kafka

Requisitos previos

Instala Kafka

Sigue el Guía de inicio rápido de Apache Kafka instalar un Kafka de nodo único en tu máquina local. Completa estos pasos en la guía de inicio rápido:

  1. Descarga la versión más reciente de Kafka y extráela.
  2. Inicie el entorno de Kafka.
  3. Crear un tema de Kafka

Autenticar

El conector de Kafka del grupo de Pub/Sub debe autenticarse con Pub/Sub para para enviar y recibir mensajes de Pub/Sub. Para configurar la autenticación, realiza los siguientes pasos:

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Instala Google Cloud CLI.
  3. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  6. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.
  7. Instala Google Cloud CLI.
  8. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login
  11. Otorga roles a tu Cuenta de Google. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Reemplaza PROJECT_ID con el ID del proyecto.
    • Reemplaza EMAIL_ADDRESS por tu dirección de correo electrónico.
    • Reemplaza ROLE por cada rol individual.

Descarga el conector JAR

Descarga el archivo JAR del conector a tu máquina local. Para obtener más información, consulta Adquiere el conector en el archivo readme de GitHub.

Copia los archivos de configuración del conector

  1. Clona o descarga la Repositorio de GitHub para el conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia el contenido del directorio config en el subdirectorio config de la instalación de Kafka.

    cp config/* [path to Kafka installation]/config/
    

Estos archivos contienen los parámetros de configuración del conector.

Actualiza la configuración de Kafka Connect

  1. Navega al directorio que contiene el objeto binario de Kafka Connect que descargado.
  2. En el directorio binario Kafka Connect, abre el archivo llamado config/connect-standalone.properties en un editor de texto.
  3. Si el elemento plugin.path property está marcado como comentario, quítalo.
  4. Actualiza plugin.path property para incluir la ruta de acceso al conector JAR.

    Ejemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Configura la propiedad offset.storage.file.filename con un nombre de archivo local. En en modo independiente, Kafka usa este archivo para almacenar datos desplazados.

    Ejemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Reenvía eventos de Kafka a Pub/Sub Lite

En esta sección, se describe cómo iniciar el conector del receptor, publicar eventos en Kafka, y, luego, leer los mensajes reenviados desde Pub/Sub Lite.

  1. Usa Google Cloud CLI para crear una reserva de Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Reemplaza lo siguiente:

    • RESERVATION_NAME: Es el nombre de Pub/Sub Lite. reserva.
    • LOCATION: Es la ubicación de la reserva.
  2. Usa Google Cloud CLI para crear un tema de Pub/Sub Lite con un suscripción.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Reemplaza lo siguiente:

    • LITE_TOPIC: Es el nombre del tema de Pub/Sub Lite que se usará. recibir mensajes de Kafka.
    • LOCATION: Es la ubicación del tema. El valor debe coincidir con el la ubicación de la reserva.
    • RESERVATION_NAME: Es el nombre de Pub/Sub Lite. reserva.
    • LITE_SUBSCRIPTION: Es el nombre de Pub/Sub Lite. suscripción del tema.
  3. Abre el archivo /config/pubsub-lite-sink-connector.properties en un editor de texto. Agrega valores para las siguientes propiedades, que están marcadas como "TODO" en el comentarios:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    Reemplaza lo siguiente:

    • KAFKA_TOPICS: Una lista separada por comas de temas de Kafka para leer de la imagen de la que se originó.
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub Lite.
    • LOCATION: Es la ubicación del tema de Pub/Sub Lite.
    • LITE_TOPIC: Es el tema de Pub/Sub Lite que se recibirá. mensajes de Kafka.
  4. En el directorio de Kafka, ejecuta el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Sigue los pasos que se indican Guía de inicio rápido de Apache Kafka para escribir algunos eventos en su tema de Kafka.

  6. Suscribirse a la suscripción de Pub/Sub Lite con cualquiera de los métodos que se muestran en Recibir mensajes de suscripciones Lite

Reenvía mensajes de Pub/Sub Lite a Kafka

En esta sección, se describe cómo iniciar el conector de origen y publicar mensajes en Pub/Sub Lite y leer los mensajes reenviados desde Kafka.

  1. Usa Google Cloud CLI para crear una reserva de Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Reemplaza lo siguiente:

    • RESERVATION_NAME: Es el nombre de Pub/Sub Lite. reserva.
    • LOCATION: Es la ubicación de la reserva.
  2. Usa Google Cloud CLI para crear un tema de Pub/Sub Lite con un suscripción.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Reemplaza lo siguiente:

    • LITE_TOPIC: Es el nombre del tema de Pub/Sub Lite.
    • LOCATION: Es la ubicación del tema. El valor debe coincidir con el la ubicación de la reserva.
    • RESERVATION_NAME: Es el nombre de Pub/Sub Lite. reserva.
    • LITE_SUBSCRIPTION: Es el nombre de Pub/Sub Lite. suscripción del tema.
  3. Abre el archivo llamado /config/pubsub-lite-source-connector.properties en una editor de texto. Agrega valores para las siguientes propiedades, que están marcadas "TODO" en los comentarios:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    Reemplaza lo siguiente:

    • KAFKA_TOPIC: Son los temas de Kafka que recibirán la mensajes de Pub/Sub.
    • PROJECT_ID: Es el proyecto de Google Cloud que contiene tu Tema de Pub/Sub.
    • LOCATION: Es la ubicación del tema de Pub/Sub Lite.
    • LITE_SUBSCRIPTION: Es el tema de Pub/Sub Lite.
  4. En el directorio de Kafka, ejecuta el siguiente comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Publica mensajes en el tema de Pub/Sub Lite con cualquiera de métodos mostrados en Publicación de mensajes en temas de Lite.

  6. Leer el mensaje de Kafka Sigue los pasos que se indican Guía de inicio rápido de Apache Kafka para leer los mensajes del tema de Kafka.

Conversión de mensajes

Un registro Kafka contiene una clave y un valor, que son arrays de bytes de longitud variable. De forma opcional, un El registro Kafka también puede tener encabezados, que son pares clave-valor. R Mensaje de Pub/Sub Lite tiene los siguientes campos:

  • key: Clave de mensaje (bytes)
  • data: Datos de mensajes (bytes)
  • attributes: Cero o más atributos. Cada atributo es un (key,values[]) mapa. Un solo atributo puede tener múltiples valores.
  • event_time: Es una marca de tiempo de evento opcional proporcionada por el usuario.

Kafka Connect utiliza convertidores para serializar claves y valores desde y hacia Kafka. Para controlar la serialización, establece las siguientes propiedades en el conector de Terraform:

  • key.converter: El conversor que se usa para serializar claves de registro.
  • value.converter: El conversor que se usa para serializar valores de registro.

Conversión de Kafka a Pub/Sub Lite

El conector del receptor convierte los registros de Kafka en mensajes de Pub/Sub Lite de la siguiente manera.

Registro Kafka (SinkRecord) Mensaje de Pub/Sub Lite
Clave key
Valor data
Encabezados attributes
Marca de tiempo eventTime
Tipo de marca de tiempo attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Tema attributes["x-goog-pubsublite-source-kafka-topic"]
Partición attributes["x-goog-pubsublite-source-kafka-offset"]
Desplazamiento attributes["x-goog-pubsublite-source-kafka-partition"]

Las claves, los valores y los encabezados se codifican de la siguiente manera:

  • Los esquemas nulos se tratan como esquemas de cadena.
  • Las cargas útiles de bytes se escriben directamente sin conversión.
  • Las cargas útiles de cadena, número entero y punto flotante están codificadas en una secuencia de los bytes UTF-8.
  • Todas las demás cargas útiles están codificadas en un búfer de protocolo Value de entrada y, luego, los convierte en una cadena de bytes.
    • Los campos de cadenas anidados se codifican en un Value de protobuf.
    • Los campos de bytes anidados están codificados en un protobuf Value que contiene el con codificación base64.
    • Los campos numéricos anidados se codifican como un doble en un Value de protobuf.
    • No se admiten mapas con claves de array, de mapa o de struct.

Conversión de Pub/Sub Lite a Kafka

El conector de origen convierte los mensajes de Pub/Sub Lite en registros de Kafka de la siguiente manera:

Mensaje de Pub/Sub Lite Registro Kafka (SourceRecord)
key Clave
data Valor
attributes Encabezados
event_time Marca de tiempo Si event_time no está presente, significa que se usa mucho tiempo.

Opciones de configuración

Además de los parámetros de configuración proporcionados por la API de Kafka Connect, el El conector es compatible con los siguientes parámetros de configuración de Pub/Sub Lite.

Opciones de configuración del conector del receptor

El conector del receptor admite las siguientes opciones de configuración.

Configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector del receptor de Pub/Sub Lite, el valor debe ser com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector.
gcp.credentials.file.path String Opcional. La ruta de acceso a un archivo que almacena credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para la autenticación de Pub/Sub Lite.
pubsublite.location String Obligatorio. La ubicación de la Tema de Pub/Sub Lite.
pubsublite.project String Obligatorio. El servicio de Google Cloud que contiene Tema de Pub/Sub Lite.
pubsublite.topic String Obligatorio. El tema de Pub/Sub Lite que se publicará a todos los registros de Kafka.
topics String Obligatorio. Lista separada por comas de temas de Kafka leer.

Opciones de configuración del conector de origen

El conector de origen admite las siguientes opciones de configuración.

Configuración Tipo de datos Descripción
connector.class String Obligatorio. La clase Java para el conector. Para el conector de origen de Pub/Sub Lite, el valor debe com.google.pubsublite.kafka.source.PubSubLiteSourceConnector.
gcp.credentials.file.path String Opcional. La ruta de acceso a un archivo que almacena credenciales de Google Cloud para autenticar Pub/Sub Lite.
gcp.credentials.json String Opcional. Un BLOB JSON que contiene Google Cloud para la autenticación de Pub/Sub Lite.
kafka.topic String Obligatorio. El tema de Kafka que recibe mensajes de Pub/Sub Lite.
pubsublite.location String Obligatorio. La ubicación de la Tema de Pub/Sub Lite.
pubsublite.partition_flow_control.bytes Long

La cantidad máxima de bytes pendientes por partición de Pub/Sub Lite.

Valor predeterminado: 20,000,000

pubsublite.partition_flow_control.messages Long

La cantidad máxima de mensajes pendientes por partición de Pub/Sub Lite.

Valor predeterminado: Long.MAX_VALUE

pubsublite.project String Obligatorio. El proyecto de Google Cloud que contiene Tema de Pub/Sub Lite.
pubsublite.subscription String Obligatorio. El nombre de Pub/Sub Lite de la que se extraen los mensajes.

¿Qué sigue?