Escribir datos de Kafka en BigQuery con Dataflow

En esta página se muestra cómo usar Dataflow para leer datos del servicio gestionado de Google Cloud para Apache Kafka y escribir los registros en una tabla de BigQuery. En este tutorial se usa la plantilla de Apache Kafka a BigQuery para crear la tarea de Dataflow.

Información general

Apache Kafka es una plataforma de código abierto para el streaming de eventos. Kafka se usa habitualmente en arquitecturas distribuidas para permitir la comunicación entre componentes con bajo acoplamiento. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para analizarlos más a fondo.

Managed Service para Apache Kafka es un servicio de Google Cloud Platform que te ayuda a ejecutar clústeres de Kafka seguros y escalables.

Lectura de eventos de Kafka en BigQuery
Arquitectura basada en eventos con Apache Kafka

Permisos obligatorios

La cuenta de servicio de los trabajadores de Dataflow debe tener los siguientes roles de Gestión de Identidades y Accesos (IAM):

  • Cliente de Kafka gestionado (roles/managedkafka.client)
  • Editor de datos de BigQuery (roles/bigquery.dataEditor)

Para obtener más información, consulta Seguridad y permisos de los flujos de datos.

Crear un clúster de Kafka

En este paso, crearás un clúster de Managed Service para Apache Kafka. Para obtener más información, consulta Crear un clúster de Managed Service para Apache Kafka.

Consola

  1. Ve a la página Clústeres de Managed Service para Apache Kafka >.

    Ir a Clústeres

  2. Haz clic en Crear.

  3. En el cuadro Nombre del clúster, introduce un nombre para el clúster.

  4. En la lista Región, selecciona una ubicación para el clúster.

  5. Haz clic en Crear.

gcloud

Usa el comando managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Haz los cambios siguientes:

  • CLUSTER: nombre del clúster
  • REGION: la región en la que has creado la subred
  • PROJECT_ID: tu ID de proyecto
  • SUBNET_NAME: la subred en la que quieres implementar el clúster

La creación de un clúster suele tardar entre 20 y 30 minutos.

Crear un tema de Kafka

Una vez creado el clúster de Managed Service para Apache Kafka, crea un tema.

Consola

  1. Ve a la página Clústeres de Managed Service para Apache Kafka >.

    Ir a Clústeres

  2. Haz clic en el nombre del clúster.

  3. En la página de detalles del clúster, haz clic en Crear tema.

  4. En el cuadro Nombre del tema, escribe el nombre del tema.

  5. Haz clic en Crear.

gcloud

Usa el comando managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Haz los cambios siguientes:

  • TOPIC_NAME: el nombre del tema que se va a crear

Crear una tabla de BigQuery

En este paso, creará una tabla de BigQuery con el siguiente esquema:

Nombre de la columna Tipo de datos
name STRING
customer_id INTEGER

Si aún no tienes un conjunto de datos de BigQuery, crea uno. Para obtener más información, consulta Crear conjuntos de datos. A continuación, crea una tabla vacía:

Consola

  1. Ve a la página BigQuery.

    Ir a BigQuery

  2. En el panel Explorador, expande tu proyecto y selecciona un conjunto de datos.

  3. En la sección de información Conjunto de datos, haga clic en Crear tabla.

  4. En la lista Crear tabla a partir de, selecciona Tabla vacía.

  5. En el cuadro Tabla, introduce el nombre de la tabla.

  6. En la sección Esquema, haz clic en Editar como texto.

  7. Pega la siguiente definición de esquema:

    name:STRING,
    customer_id:INTEGER
    
  8. Haz clic en Crear tabla.

gcloud

Usa el comando bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Haz los cambios siguientes:

  • PROJECT_ID: tu ID de proyecto
  • DATASET_NAME: el nombre del conjunto de datos
  • TABLE_NAME: el nombre de la tabla que se va a crear

Ejecutar la tarea de Dataflow

Después de crear el clúster de Kafka y la tabla de BigQuery, ejecuta la plantilla de Dataflow.

.

Consola

Primero, obtén la dirección del servidor de arranque del clúster:

  1. En la Google Cloud consola, ve a la página Clusters.

    Ir a Clústeres

  2. Haz clic en el nombre del clúster.

  3. Haz clic en la pestaña Configuraciones.

  4. Copia la dirección del servidor de arranque de URL de arranque.

A continuación, ejecuta la plantilla para crear la tarea de Dataflow:

  1. Ve a la página Dataflow > Tareas.

    Ir a Tareas

  2. Haz clic en Crear tarea a partir de plantilla.

  3. En el campo Nombre del trabajo, introduce kafka-to-bq.

  4. En Endpoint regional, selecciona la región en la que se encuentra tu clúster de Managed Service para Apache Kafka.

  5. Selecciona la plantilla "De Kafka a BigQuery".

  6. Introduce los siguientes parámetros de plantilla:

    • Servidor de arranque de Kafka: la dirección del servidor de arranque.
    • Tema de Kafka de origen: el nombre del tema que se va a leer.
    • Modo de autenticación de la fuente de Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Formato de mensaje de Kafka: JSON
    • Estrategia de nombres de tabla: SINGLE_TABLE_NAME
    • Tabla de salida de BigQuery: tabla de BigQuery con el siguiente formato: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. En Cola de mensajes fallidos, marca Escribir errores en BigQuery.

  8. Introduce el nombre de una tabla de BigQuery para la cola de mensajes fallidos con el siguiente formato: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    No crees esta tabla con antelación. La canalización lo crea.

  9. Haz clic en Ejecutar trabajo.

gcloud

Usa el comando dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Sustituye las siguientes variables:

  • LOCATION: la región en la que se encuentra tu Managed Service para Apache Kafka
  • PROJECT_ID: el nombre de tu proyecto de Google Cloud Platform
  • CLUSTER_ID: el del clúster
  • TOPIC: el nombre del tema de Kafka
  • DATASET_NAME: el nombre del conjunto de datos
  • TABLE_NAME: el nombre de la tabla
  • ERROR_TABLE_NAME: nombre de la tabla de BigQuery de la cola de mensajes fallidos

No crees la tabla de la cola de mensajes fallidos con antelación. El flujo de procesamiento lo crea.

Enviar mensajes a Kafka

Una vez que se inicie la tarea de Dataflow, puedes enviar mensajes a Kafka y la canalización los escribirá en BigQuery.

  1. Crea una VM en la misma subred que el clúster de Kafka e instala las herramientas de línea de comandos de Kafka. Para obtener instrucciones detalladas, consulta el artículo Configurar una máquina cliente en Publicar y consumir mensajes con la CLI.

  2. Ejecuta el siguiente comando para escribir mensajes en el tema de Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Sustituye las siguientes variables:

    • TOPIC: el nombre del tema de Kafka
    • CLUSTER_ID: el nombre del clúster
    • LOCATION: la región en la que se encuentra tu clúster
    • PROJECT_ID: el nombre de tu proyecto de Google Cloud Platform
  3. En la petición, introduce las siguientes líneas de texto para enviar mensajes a Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Usar una cola de mensajes fallidos

Mientras se ejecuta la tarea, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Entre los posibles errores se incluyen los siguientes:

  • Errores de serialización, incluido JSON con formato incorrecto.
  • Errores de conversión de tipos, causados por una discrepancia entre el esquema de la tabla y los datos JSON.
  • Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.

Estos errores no provocan que el trabajo falle y no aparecen como errores en el registro de trabajos de Dataflow. En su lugar, la canalización usa una cola de mensajes fallidos para gestionar este tipo de errores.

Para habilitar la cola de mensajes fallidos al ejecutar la plantilla, define los siguientes parámetros de plantilla:

  • useBigQueryDLQ: true
  • outputDeadletterTable: nombre completo de una tabla de BigQuery. Por ejemplo, my-project:dataset1.errors.

La canalización crea la tabla automáticamente. Si se produce un error al procesar un mensaje de Kafka, la canalización escribe una entrada de error en la tabla.

Ejemplos de mensajes de error:

Tipo de error Datos del evento errorMessage
Error de serialización "Hola mundo" No se ha podido serializar el JSON en una fila de tabla: "Hello world"
Error de conversión de tipo {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconocido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Trabajar con tipos de datos de BigQuery

Internamente, el conector de E/S de Kafka convierte las cargas útiles de los mensajes JSON en objetos TableRow de Apache Beam y traduce los valores de los campos TableRow a tipos de BigQuery.

En la siguiente tabla se muestran las representaciones JSON de los tipos de datos de BigQuery.

Tipo de BigQuery Representación JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Especifica la geografía mediante texto conocido (WKT) o GeoJSON, con formato de cadena. Para obtener más información, consulta Cargar datos geoespaciales.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Usa el método Date.toJSON de JavaScript para dar formato al valor.

Datos estructurados

Si tus mensajes JSON siguen un esquema coherente, puedes representar objetos JSON con el tipo de datos STRUCT en BigQuery.

En el siguiente ejemplo, el campo answers es un objeto JSON con dos subcampos, a y b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

La siguiente instrucción SQL crea una tabla de BigQuery con un esquema compatible:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

La tabla resultante tendrá este aspecto:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

y semiestructurados

Si tus mensajes JSON no siguen un esquema estricto, plantéate almacenarlos en BigQuery como un tipo de datos JSON. Si almacenas datos JSON como un tipo JSON, no tienes que definir el esquema por adelantado. Después de la ingestión de datos, puedes consultar los datos mediante los operadores de acceso a campos (notación de puntos) y de acceso a arrays en GoogleSQL. Para obtener más información, consulta el artículo Trabajar con datos JSON en GoogleSQL.

Usar una función definida por el usuario para transformar los datos

En este tutorial se da por hecho que los mensajes de Kafka tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON, sin que se haya aplicado ninguna transformación a los datos.

También puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de escribirlos en BigQuery. La función definida por el usuario también puede realizar un procesamiento adicional, como filtrar, eliminar información personal identificable (IPI) o enriquecer los datos con campos adicionales.

Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Siguientes pasos