Escribir datos de Kafka en BigQuery con Dataflow

En este documento, se proporciona una orientación general sobre la creación y la implementación de una canalización de Dataflow que se transmite desde Apache Kafka a BigQuery.

Apache Kafka es una plataforma de código abierto para eventos de transmisión. Kafka se usa en general en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para su análisis posterior.

Lee eventos de Kafka en BigQuery

Google proporciona una plantilla de Dataflow que configura una canalización de Kafka a BigQuery. La plantilla usa el conector de BigQueryIO que se proporciona en el SDK de Apache Beam.

Para usar esta plantilla, sigue estos pasos:

  1. Implementa Kafka, ya sea en Google Cloud o en otro lugar.
  2. Configura las Herramientas de redes
  3. Configura los permisos de administración de identidades y accesos (IAM).
  4. Escribe una función para transformar los datos del evento.
  5. Crea la tabla de salida de BigQuery.
  6. Implementa la plantilla de Dataflow.

Implementa Kafka

Dentro de Google Cloud, puedes implementar un clúster de Kafka en instancias de máquina virtual (VM) de Compute Engine o usar un servicio administrado de Kafka de terceros. Para obtener más información sobre las opciones de implementación en Google Cloud, consulta ¿Qué es Apache Kafka? Puedes encontrar soluciones de Kafka de terceros en Google Cloud Marketplace.

Como alternativa, es posible que tengas un clúster de Kafka existente que resida fuera de Google Cloud. Por ejemplo, puedes tener una carga de trabajo existente que se implementa de forma local o en otra nube pública.

Configura las Herramientas de redes

De forma predeterminada, Dataflow inicia instancias dentro de la red de nube privada virtual (VPC) predeterminada. Según la configuración de Kafka, es posible que debas configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especifica una red y una subred en la documentación de Dataflow. Cuando configures la red, crea reglas de firewall que permitan que las máquinas de trabajador de Dataflow lleguen a los agentes de Kafka.

Si usas los Controles del servicio de VPC, coloca el clúster de Kafka dentro del perímetro de los Controles del servicio de VPC o extiende los perímetros a la VPN autorizada o a Cloud Interconnect.

Conéctate a un clúster externo

Si tu clúster de Kafka se implementa fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Existen varias opciones de herramientas de redes con diferentes compensaciones:

La interconexión dedicada es la mejor opción para un rendimiento y una confiabilidad predecibles, pero puede tardar más en configurarse porque los terceros deben aprovisionar los circuitos nuevos. Con una topología según la IP pública, puedes comenzar con rapidez porque hay poco trabajo de herramientas de redes para hacer.

En las siguientes dos secciones, se describen estas opciones con más detalle.

Espacio de direcciones de RFC 1918 compartido

La interconexión dedicada y la VPN de IPsec te brindan acceso directo a las direcciones IP RFC 1918 en la nube privada virtual (VPC), lo que puede simplificar la configuración de Kafka. Si usas una topología basada en VPN, considera configurar una VPN de alta capacidad de procesamiento.

De forma predeterminada, Dataflow inicia instancias en la red de VPC predeterminada. En una topología de red privada con rutas definidas de forma explícita en Cloud Router que conectan subredes en Google Cloud a ese clúster de Kafka, necesitas más control sobre dónde ubicar las instancias de Dataflow. Puedes usar Dataflow para configurar los parámetros de ejecución network y subnetwork.

Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow inicie instancias mientras intenta escalar horizontalmente. Además, cuando crees una red independiente para iniciar las instancias de Dataflow, asegúrate de tener una regla de firewall que habilite el tráfico de TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de firewall.

Espacio de direcciones IP públicas

Esta arquitectura usa la seguridad de la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa texto simple en la comunicación entre agentes. Cuando el objeto de escucha de Kafka se vincula a una interfaz de red que se usa para la comunicación interna y externa, la configuración del objeto de escucha es sencilla. Sin embargo, en muchas situaciones, las direcciones anunciadas de forma externa de los agentes de Kafka en el clúster difieren de las interfaces de red internas que usa Kafka. En tales casos, puedes usar la propiedad advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Los clientes externos se conectan mediante el puerto 9093 a través de un canal SSL y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto simple. Cuando especifiques una dirección en advertised.listeners, usa nombres de DNS (kafkabroker-n.mydomain.com, en esta muestra) que se resuelven en la misma instancia para el tráfico interno y externo. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no pueden resolverse para el tráfico interno.

Configura los permisos de IAM

Los trabajos de Dataflow usan dos cuentas de servicio de IAM:

  • El servicio de Dataflow usa una cuenta de servicio de Dataflow para manipular los recursos de Google Cloud, como la creación de VMs.
  • Las VMs de trabajador de Dataflow usan una cuenta de servicio de trabajador para acceder a los archivos y a otros recursos de la canalización. Esta cuenta de servicio necesita acceso de escritura a la tabla de salida de BigQuery. También necesita acceder a cualquier otro recurso al que haga referencia el trabajo de canalización.

Asegúrate de que estas dos cuentas de servicio tengan los roles adecuados. Para obtener más información, consulta Seguridad y permisos de Dataflow.

Transforma los datos para BigQuery

La plantilla de Kafka a BigQuery crea una canalización que lee eventos de uno o más temas de Kafka, y los escribe en una tabla de BigQuery. De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos del evento antes de que se escriban en BigQuery.

El resultado de la canalización debe ser un formato con formato JSON que coincida con el esquema de la tabla de salida. Si los datos del evento de Kafka ya están en formato JSON, puedes crear una tabla de BigQuery con un esquema coincidente y pasar los eventos directamente a BigQuery. De lo contrario, crea una UDF que tome los datos del evento como entrada y muestre los datos JSON que coincidan con tu tabla de BigQuery.

Por ejemplo, supongamos que los datos del evento contienen dos campos:

  • name (string)
  • customer_id (entero)

El resultado de la canalización de Dataflow podría tener el siguiente aspecto:

{ "name": "Alice", "customer_id": 1234 }

Si suponemos que los datos del evento aún no están en formato JSON, deberías escribir una UDF que transforme los datos, de la siguiente manera:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

La UDF puede realizar un procesamiento adicional en los datos de eventos, como filtrar eventos, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.

Si deseas obtener más información sobre cómo escribir una UDF para la plantilla, consulta Extiende tu plantilla de Dataflow con UDF. Sube el archivo JavaScript a Cloud Storage.

Crea la tabla de salida de BigQuery

Crea la tabla de salida de BigQuery antes de ejecutar la plantilla. El esquema de la tabla debe ser compatible con el resultado de JSON de la canalización. Para cada propiedad en la carga útil de JSON, la canalización escribe el valor en la columna de la tabla de BigQuery del mismo nombre. Cualquier propiedad que falta en el JSON se interpreta como valores NULL.

Según el ejemplo anterior, la tabla de BigQuery tendría las siguientes columnas:

Nombre de la columna Tipo de dato
name STRING
customer_id INTEGER

Puedes usar la instrucción de SQL CREATE TABLE para crear la tabla:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Como alternativa, puedes especificar el esquema de la tabla mediante un archivo de definición JSON. Si deseas obtener más información, consulta Especifica un esquema en la documentación de BigQuery.

Ejecuta el trabajo de Dataflow:

Después de crear la tabla de BigQuery, ejecuta la plantilla de Dataflow.

Consola

Para crear el trabajo de Dataflow con la consola de Google Cloud, sigue estos pasos:

  1. Ve a la página de Dataflow en la consola de Google Cloud.
  2. Haz clic en Crear trabajo a partir de una plantilla.
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. En Extremo regional, selecciona una región.
  5. Selecciona la plantilla “Kafka a BigQuery”.
  6. En Parámetros obligatorios, ingresa el nombre de la tabla de resultados de BigQuery. La tabla ya debe existir y tener un esquema válido.
  7. Haz clic en Mostrar parámetros opcionales y, luego, ingresa valores para al menos los siguientes parámetros:

    • El tema de Kafka del que se leerá la entrada.
    • La lista de servidores de arranque de Kafka, separados por comas.
    • El correo electrónico de una cuenta de servicio.

    Ingresa parámetros adicionales según sea necesario. En particular, es posible que debas especificar lo siguiente:

    • Herramientas de redes: Para usar una red de VPC que no sea la predeterminada, especifica la red y la subred.
    • UDF: Para usar una UDF de JavaScript, especifica la ubicación de Cloud Storage de la secuencia de comandos y el nombre de la función de JavaScript que se invocará.

gcloud

Para crear el trabajo de Dataflow mediante Google Cloud CLI, ejecuta el siguiente comando:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Reemplaza las siguientes variables:

  • JOB_NAME. Un nombre del trabajo que elijas
  • LOCATION. La región en la que se ejecuta el trabajo. Para obtener más información acerca de las regiones y ubicaciones, consulta Ubicaciones de Dataflow.
  • KAFKA_TOPICS. Una lista separada por comas de los temas de Kafka que se deben leer.
  • BOOTSTRAP_SERVERS. Una lista separada por comas de servidores de arranque de Kafka. Ejemplo: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE. La tabla de salida de BigQuery, especificada como PROJECT_ID:DATASET_NAME.TABLE_NAME. Ejemplo: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Es opcional. La dirección de correo electrónico de la cuenta de servicio que se usará para ejecutar el trabajo.
  • UDF_SCRIPT_PATH. Es opcional. Es la ruta de acceso de Cloud Storage a un archivo JavaScript que contiene una UDF. Ejemplo: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Es opcional. El nombre de la función de JavaScript que se llamará como la UDF.
  • VPC_NETWORK_NAME. Es opcional. La red a la que se asignarán los trabajadores.
  • SUBNET_NAME. Es opcional. La subred a la que se asignarán los trabajadores.

Tipos de datos

En esta sección, se describe cómo manejar varios tipos de datos en el esquema de la tabla de BigQuery.

De forma interna, los mensajes JSON se convierten en objetos TableRow y los valores de campo TableRow se traducen a tipos de BigQuery.

Tipos escalares

En el siguiente ejemplo, se crea una tabla de BigQuery con diferentes tipos de datos escalares, que incluyen los tipos de string, numérico, booleano, de fecha y hora, de intervalo y de geografía:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Esta es una carga útil de JSON con campos compatibles:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Notas:

  • Para una columna TIMESTAMP, puedes usar el método Date.toJSON de JavaScript para dar formato al valor.
  • Para la columna GEOGRAPHY, puedes especificar la geografía mediante el texto conocido (WKT) o GeoJSON, con formato de string. Para obtener más información, consulta Carga datos geoespaciales.

Para obtener más información sobre los tipos de datos de BigQuery, consulta Tipos de datos.

Arrays

Puedes almacenar un array en BigQuery con el tipo de datos ARRAY. En el siguiente ejemplo, la carga útil de JSON contiene una propiedad llamada scores cuyo valor es un arreglo JSON:

{"name":"Emily","scores":[10,7,10,9]}

La siguiente instrucción de SQL CREATE TABLE crea una tabla de BigQuery con un esquema compatible:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

La tabla resultante se ve de la siguiente manera:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Estructuras

El tipo de datos STRUCT en BigQuery contiene una lista ordenada de campos con nombre. Puedes usar un STRUCT para contener objetos JSON que siguen un esquema coherente.

En el siguiente ejemplo, la carga útil de JSON contiene una propiedad llamada val cuyo valor es un objeto JSON:

{"name":"Emily","val":{"a":"yes","b":"no"}}

La siguiente instrucción de SQL CREATE TABLE crea una tabla de BigQuery con un esquema compatible:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

La tabla resultante se ve de la siguiente manera:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Datos de eventos semiestructurados

Si los datos del evento de Kafka no siguen un esquema estricto, considera almacenarlos en BigQuery como un tipo de datos JSON (vista previa). Cuando almacenas datos JSON como un tipo de datos JSON, no es necesario definir el esquema del evento por adelantado. Después de la transferencia de datos, puedes consultar la tabla de resultados con el acceso de campo (notación de puntos) y los operadores de acceso al arreglo.

En primer lugar, crea una tabla con una columna JSON

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Luego, define una UDF de JavaScript que una la carga útil del evento dentro de un objeto JSON:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Después de escribir los datos en BigQuery, puedes consultar los campos individuales mediante el operador de acceso a campos. Por ejemplo, mediante la siguiente consulta, se muestra el valor del campo name para cada registro:

SELECT event_data.name FROM my_dataset1.kafka_events;

Para obtener más información sobre el uso de JSON en BigQuery, consulta Trabaja con datos JSON en SQL estándar de Google.

Errores y registro

Es posible que experimentes errores cuando ejecutas la canalización o errores mientras manejas eventos individuales de Kafka.

Para obtener más información sobre cómo manejar los errores de canalización, consulta Solución de problemas y depuración de canalizaciones.

Si el trabajo se ejecuta de forma correcta, pero se produce un error cuando se procesa un evento individual de Kafka, el trabajo de canalización escribe un registro de error en una tabla en BigQuery. El trabajo en sí no falla y el error a nivel de evento no aparece como un error en el registro de trabajos de Dataflow.

El trabajo de canalización crea la tabla de forma automática para contener los registros de errores. De forma predeterminada, el nombre de la tabla es “output_table_error_records”, en el que output_table es el nombre de la tabla de salida. Por ejemplo, si la tabla de salida se llama kafka_events, la tabla de error se llama kafka_events_error_records. Puedes especificar un nombre diferente si configuras el parámetro de plantilla outputDeadletterTable:

outputDeadletterTable=my_project:dataset1.errors_table

Los errores posibles incluyen los siguientes:

  • Errores de serialización, incluidos JSON que tienen un formato incorrecto.
  • Errores de conversión de tipo, causados por una discrepancia en el esquema de la tabla y los datos JSON.
  • Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla.

Ejemplos de mensajes de error:

Tipo de error Datos de eventos errorMessage
Error de serialización “Hello world” No se pudo serializar json en la fila de la tabla: “Hello world”
Tipo de error de conversión {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Campo desconocido {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Próximos pasos