En esta página se muestra cómo usar Dataflow para leer datos del servicio gestionado de Google Cloud para Apache Kafka y escribir los registros en una tabla de BigQuery. En este tutorial se usa la plantilla de Apache Kafka a BigQuery para crear la tarea de Dataflow.
Información general
Apache Kafka es una plataforma de código abierto para el streaming de eventos. Kafka se usa habitualmente en arquitecturas distribuidas para permitir la comunicación entre componentes con bajo acoplamiento. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para analizarlos más a fondo.
Managed Service para Apache Kafka es un servicio de Google Cloud Platform que te ayuda a ejecutar clústeres de Kafka seguros y escalables.

Permisos obligatorios
La cuenta de servicio de los trabajadores de Dataflow debe tener los siguientes roles de Gestión de Identidades y Accesos (IAM):
- Cliente de Kafka gestionado (
roles/managedkafka.client
) - Editor de datos de BigQuery (
roles/bigquery.dataEditor
)
Para obtener más información, consulta Seguridad y permisos de los flujos de datos.
Crear un clúster de Kafka
En este paso, crearás un clúster de Managed Service para Apache Kafka. Para obtener más información, consulta Crear un clúster de Managed Service para Apache Kafka.
Consola
Ve a la página Clústeres de Managed Service para Apache Kafka >.
Haz clic en Crear.
En el cuadro Nombre del clúster, introduce un nombre para el clúster.
En la lista Región, selecciona una ubicación para el clúster.
Haz clic en Crear.
gcloud
Usa el comando
managed-kafka clusters create
.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Haz los cambios siguientes:
CLUSTER
: nombre del clústerREGION
: la región en la que has creado la subredPROJECT_ID
: tu ID de proyectoSUBNET_NAME
: la subred en la que quieres implementar el clúster
La creación de un clúster suele tardar entre 20 y 30 minutos.
Crear un tema de Kafka
Una vez creado el clúster de Managed Service para Apache Kafka, crea un tema.
Consola
Ve a la página Clústeres de Managed Service para Apache Kafka >.
Haz clic en el nombre del clúster.
En la página de detalles del clúster, haz clic en Crear tema.
En el cuadro Nombre del tema, escribe el nombre del tema.
Haz clic en Crear.
gcloud
Usa el comando
managed-kafka topics create
.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Haz los cambios siguientes:
TOPIC_NAME
: el nombre del tema que se va a crear
Crear una tabla de BigQuery
En este paso, creará una tabla de BigQuery con el siguiente esquema:
Nombre de la columna | Tipo de datos |
---|---|
name |
STRING |
customer_id |
INTEGER |
Si aún no tienes un conjunto de datos de BigQuery, crea uno. Para obtener más información, consulta Crear conjuntos de datos. A continuación, crea una tabla vacía:
Consola
Ve a la página BigQuery.
En el panel Explorador, expande tu proyecto y selecciona un conjunto de datos.
En la sección de información Conjunto de datos, haga clic en
Crear tabla.En la lista Crear tabla a partir de, selecciona Tabla vacía.
En el cuadro Tabla, introduce el nombre de la tabla.
En la sección Esquema, haz clic en Editar como texto.
Pega la siguiente definición de esquema:
name:STRING, customer_id:INTEGER
Haz clic en Crear tabla.
gcloud
Usa el comando bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Haz los cambios siguientes:
PROJECT_ID
: tu ID de proyectoDATASET_NAME
: el nombre del conjunto de datosTABLE_NAME
: el nombre de la tabla que se va a crear
Ejecutar la tarea de Dataflow
Después de crear el clúster de Kafka y la tabla de BigQuery, ejecuta la plantilla de Dataflow.
.Consola
Primero, obtén la dirección del servidor de arranque del clúster:
En la Google Cloud consola, ve a la página Clusters.
Haz clic en el nombre del clúster.
Haz clic en la pestaña Configuraciones.
Copia la dirección del servidor de arranque de URL de arranque.
A continuación, ejecuta la plantilla para crear la tarea de Dataflow:
Ve a la página Dataflow > Tareas.
Haz clic en Crear tarea a partir de plantilla.
En el campo Nombre del trabajo, introduce
kafka-to-bq
.En Endpoint regional, selecciona la región en la que se encuentra tu clúster de Managed Service para Apache Kafka.
Selecciona la plantilla "De Kafka a BigQuery".
Introduce los siguientes parámetros de plantilla:
- Servidor de arranque de Kafka: la dirección del servidor de arranque.
- Tema de Kafka de origen: el nombre del tema que se va a leer.
- Modo de autenticación de la fuente de Kafka:
APPLICATION_DEFAULT_CREDENTIALS
- Formato de mensaje de Kafka:
JSON
- Estrategia de nombres de tabla:
SINGLE_TABLE_NAME
- Tabla de salida de BigQuery: tabla de BigQuery con el siguiente formato:
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
En Cola de mensajes fallidos, marca Escribir errores en BigQuery.
Introduce el nombre de una tabla de BigQuery para la cola de mensajes fallidos con el siguiente formato:
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
No crees esta tabla con antelación. La canalización lo crea.
Haz clic en Ejecutar trabajo.
gcloud
Usa el comando
dataflow flex-template run
.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Sustituye las siguientes variables:
LOCATION
: la región en la que se encuentra tu Managed Service para Apache KafkaPROJECT_ID
: el nombre de tu proyecto de Google Cloud PlatformCLUSTER_ID
: el del clústerTOPIC
: el nombre del tema de KafkaDATASET_NAME
: el nombre del conjunto de datosTABLE_NAME
: el nombre de la tablaERROR_TABLE_NAME
: nombre de la tabla de BigQuery de la cola de mensajes fallidos
No crees la tabla de la cola de mensajes fallidos con antelación. El flujo de procesamiento lo crea.
Enviar mensajes a Kafka
Una vez que se inicie la tarea de Dataflow, puedes enviar mensajes a Kafka y la canalización los escribirá en BigQuery.
Crea una VM en la misma subred que el clúster de Kafka e instala las herramientas de línea de comandos de Kafka. Para obtener instrucciones detalladas, consulta el artículo Configurar una máquina cliente en Publicar y consumir mensajes con la CLI.
Ejecuta el siguiente comando para escribir mensajes en el tema de Kafka:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Sustituye las siguientes variables:
TOPIC
: el nombre del tema de KafkaCLUSTER_ID
: el nombre del clústerLOCATION
: la región en la que se encuentra tu clústerPROJECT_ID
: el nombre de tu proyecto de Google Cloud Platform
En la petición, introduce las siguientes líneas de texto para enviar mensajes a Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Usar una cola de mensajes fallidos
Mientras se ejecuta la tarea, es posible que la canalización no pueda escribir mensajes individuales en BigQuery. Entre los posibles errores se incluyen los siguientes:
- Errores de serialización, incluido JSON con formato incorrecto.
- Errores de conversión de tipos, causados por una discrepancia entre el esquema de la tabla y los datos JSON.
- Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.
Estos errores no provocan que el trabajo falle y no aparecen como errores en el registro de trabajos de Dataflow. En su lugar, la canalización usa una cola de mensajes fallidos para gestionar este tipo de errores.
Para habilitar la cola de mensajes fallidos al ejecutar la plantilla, define los siguientes parámetros de plantilla:
useBigQueryDLQ
:true
outputDeadletterTable
: nombre completo de una tabla de BigQuery. Por ejemplo,my-project:dataset1.errors
.
La canalización crea la tabla automáticamente. Si se produce un error al procesar un mensaje de Kafka, la canalización escribe una entrada de error en la tabla.
Ejemplos de mensajes de error:
Tipo de error | Datos del evento | errorMessage |
---|---|---|
Error de serialización | "Hola mundo" | No se ha podido serializar el JSON en una fila de tabla: "Hello world" |
Error de conversión de tipo | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Campo desconocido | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Trabajar con tipos de datos de BigQuery
Internamente, el conector de E/S de Kafka convierte las cargas útiles de los mensajes JSON en objetos TableRow
de Apache Beam y traduce los valores de los campos TableRow
a tipos de BigQuery.
En la siguiente tabla se muestran las representaciones JSON de los tipos de datos de BigQuery.
Tipo de BigQuery | Representación JSON |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" Especifica la geografía mediante texto conocido (WKT) o GeoJSON, con formato de cadena. Para obtener más información, consulta Cargar datos geoespaciales. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" Usa el método |
Datos estructurados
Si tus mensajes JSON siguen un esquema coherente, puedes representar objetos JSON con el tipo de datos STRUCT
en BigQuery.
En el siguiente ejemplo, el campo answers
es un objeto JSON con dos subcampos, a
y b
:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
La siguiente instrucción SQL crea una tabla de BigQuery con un esquema compatible:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
La tabla resultante tendrá este aspecto:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
y semiestructurados
Si tus mensajes JSON no siguen un esquema estricto, plantéate almacenarlos en BigQuery como un tipo de datos JSON
.
Si almacenas datos JSON como un tipo JSON
, no tienes que definir el esquema por adelantado. Después de la ingestión de datos, puedes consultar los datos mediante los operadores de acceso a campos (notación de puntos) y de acceso a arrays en GoogleSQL. Para obtener más información, consulta el artículo Trabajar con datos JSON en GoogleSQL.
Usar una función definida por el usuario para transformar los datos
En este tutorial se da por hecho que los mensajes de Kafka tienen formato JSON y que el esquema de la tabla de BigQuery coincide con los datos JSON, sin que se haya aplicado ninguna transformación a los datos.
También puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos antes de escribirlos en BigQuery. La función definida por el usuario también puede realizar un procesamiento adicional, como filtrar, eliminar información personal identificable (IPI) o enriquecer los datos con campos adicionales.
Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.