En este documento, se proporciona una orientación general sobre la creación y la implementación de una canalización de Dataflow que se transmite desde Apache Kafka a BigQuery.
Apache Kafka es una plataforma de código abierto para eventos de transmisión. Kafka se suele usar en arquitecturas distribuidas para permitir la comunicación entre componentes con acoplamiento bajo. Puedes usar Dataflow para leer eventos de Kafka, procesarlos y escribir los resultados en una tabla de BigQuery para un análisis más detallado.
Google proporciona una plantilla de Dataflow que configura una canalización de Kafka a BigQuery. La plantilla usa el conector de BigQueryIO que se proporciona en el SDK de Apache Beam.
Para usar esta plantilla, sigue estos pasos:
- Implementa Kafka, ya sea en Google Cloud o en otro lugar.
- Configura las Herramientas de redes
- Configura los permisos de Identity and Access Management (IAM)
- Escribe una función para transformar los datos del evento.
- Crea la tabla de salida de BigQuery.
- Implementa la plantilla de Dataflow.
Implementa Kafka
En Google Cloud, puedes implementar un clúster de Kafka en instancias de máquina virtual (VM) de Compute Engine o usar un servicio de Kafka administrado por terceros. Para obtener más información sobre las opciones de implementación en Google Cloud, consulta ¿Qué es Apache Kafka? Puedes encontrar soluciones de Kafka de terceros en Google Cloud Marketplace.
Como alternativa, es posible que tengas un clúster de Kafka existente que resida fuera de Google Cloud. Por ejemplo, es posible que tengas una carga de trabajo existente que se implementa de forma local o en otra nube pública.
Configura las Herramientas de redes
De forma predeterminada, Dataflow inicia instancias dentro de la red de nube privada virtual (VPC) predeterminada. Según tu configuración de Kafka, es posible que debas configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especifica una red y una subred. Cuando configures la red, crea reglas de firewall que permitan que las máquinas de trabajador de Dataflow lleguen a los agentes de Kafka.
Si usas los Controles del servicio de VPC, coloca el clúster de Kafka dentro del perímetro de los Controles del servicio de VPC o extiende los perímetros a la VPN autorizada o a Cloud Interconnect.
Si tu clúster de Kafka se implementa fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Existen varias opciones de herramientas de redes con diferentes compensaciones:
- Conéctate con un espacio de direcciones RFC 1918 compartido mediante una de las siguientes opciones:
- Accede al clúster de Kafka alojado de forma externa a través de direcciones IP públicas con una de las siguientes opciones:
- Internet pública
- Intercambio de tráfico directo
- Intercambio de tráfico con proveedores
La interconexión dedicada es la mejor opción para un rendimiento y una confiabilidad predecibles, pero puede tardar más en configurarse porque los terceros deben aprovisionar los circuitos nuevos. Con una topología según la IP pública, puedes comenzar con rapidez porque hay poco trabajo de herramientas de redes para hacer.
En las siguientes dos secciones, se describen estas opciones con más detalle.
Espacio de direcciones de RFC 1918 compartido
La interconexión dedicada y la VPN de IPsec te brindan acceso directo a las direcciones IP RFC 1918 en la nube privada virtual (VPC), lo que puede simplificar la configuración de Kafka. Si usas una topología basada en VPN, considera configurar una VPN de alta capacidad de procesamiento.
De forma predeterminada, Dataflow inicia instancias en la red de VPC predeterminada. En una topología de red privada con rutas definidas de forma explícita en Cloud Router que conectan subredes en Google Cloud a ese clúster de Kafka, necesitas más control sobre dónde ubicar las instancias de Dataflow. Puedes usar Dataflow para configurar los parámetros de ejecución network
y subnetwork
.
Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow inicie instancias mientras intenta escalar horizontalmente. Además, cuando crees una red independiente para iniciar las instancias de Dataflow, asegúrate de tener una regla de firewall que habilite el tráfico de TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de firewall.
Espacio de direcciones IP públicas
Esta arquitectura usa la seguridad de la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa texto simple en la comunicación entre agentes. Cuando el objeto de escucha de Kafka se vincula a una interfaz de red que se usa para la comunicación interna y externa, la configuración del objeto de escucha es sencilla. Sin embargo, en muchas situaciones, las direcciones anunciadas de forma externa de los agentes de Kafka en el clúster difieren de las interfaces de red internas que usa Kafka. En tales casos, puedes usar la propiedad advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Los clientes externos se conectan mediante el puerto 9093 a través de un canal SSL y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto simple. Cuando especifiques una dirección en advertised.listeners
, usa nombres de DNS (kafkabroker-n.mydomain.com
, en esta muestra) que se resuelven en la misma instancia para el tráfico interno y externo. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no pueden resolverse para el tráfico interno.
Configura los permisos de IAM
Los trabajos de Dataflow usan dos cuentas de servicio de IAM:
- El servicio de Dataflow usa una cuenta de servicio de Dataflow para manipular los recursos de Google Cloud, como la creación de VMs.
- Las VMs de trabajador de Dataflow usan una cuenta de servicio de trabajador para acceder a los archivos y a otros recursos de la canalización. Esta cuenta de servicio necesita acceso de escritura a la tabla de salida de BigQuery. También necesita acceso a cualquier otro recurso al que haga referencia el trabajo de canalización.
Asegúrate de que estas dos cuentas de servicio tengan los roles adecuados. Para obtener más información, consulta Seguridad y permisos de Dataflow.
Transforma los datos para BigQuery
La plantilla de Kafka a BigQuery crea una canalización que lee eventos de uno o más temas de Kafka y los escribe en una tabla de BigQuery. De forma opcional, puedes proporcionar una función definida por el usuario (UDF) de JavaScript que transforme los datos del evento antes de que se escriban en BigQuery.
El resultado de la canalización debe ser datos con formato JSON que coincidan con el esquema de la tabla de salida. Si los datos de eventos de Kafka ya están en formato JSON, puedes crear una tabla de BigQuery con un esquema coincidente y pasar los eventos directamente a BigQuery. De lo contrario, crea una UDF que tome los datos del evento como entrada y muestre datos JSON que coincidan con tu tabla de BigQuery.
Por ejemplo, supongamos que los datos del evento contienen dos campos:
name
(string)customer_id
(entero)
El resultado de la canalización de Dataflow podría verse de la siguiente manera:
{ "name": "Alice", "customer_id": 1234 }
Suponiendo que los datos de eventos aún no están en formato JSON, debes escribir una UDF que transforme los datos de la siguiente manera:
// UDF
function process(eventData) {
var name;
var customer_id;
// TODO Parse the event data to extract the name and customer_id fields.
// Return a JSON payload.
return JSON.stringify({ name: name, customer_id: customer_id });
}
La UDF puede realizar un procesamiento adicional en los datos del evento, como filtrar eventos, quitar información de identificación personal (PII) o enriquecer los datos con campos adicionales.
Para obtener más información sobre cómo escribir una UDF para la plantilla, consulta Extiende tu plantilla de Dataflow con UDF. Sube el archivo JavaScript a Cloud Storage.
Crea la tabla de salida de BigQuery
Crea la tabla de salida de BigQuery antes de ejecutar la plantilla. El esquema de la tabla debe ser compatible con el resultado de JSON de la canalización. Para cada propiedad en la carga útil JSON, la canalización escribe el valor en la columna de la tabla de BigQuery del mismo nombre. Las propiedades que falten en el JSON se interpretarán como valores NULL.
En el ejemplo anterior, la tabla de BigQuery tendría las siguientes columnas:
Nombre de la columna | Tipo de datos |
---|---|
name |
STRING |
customer_id |
INTEGER |
Puedes usar la instrucción de SQL CREATE TABLE
para crear la tabla:
CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);
De manera alternativa, puedes especificar el esquema de tabla mediante un archivo de definición JSON. Para obtener más información, consulta Especifica un esquema en la documentación de BigQuery.
Ejecuta el trabajo de Dataflow:
Después de crear la tabla de BigQuery, ejecuta la plantilla de Dataflow.
Console
Para crear el trabajo de Dataflow con la consola de Google Cloud, sigue estos pasos:
- Ve a la página de Dataflow en la consola de Google Cloud.
- Haz clic en Crear trabajo a partir de una plantilla.
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- En Extremo regional, selecciona una región.
- Selecciona la plantilla “Kafka a BigQuery”.
- En Parámetros obligatorios, ingresa el nombre de la tabla de salida de BigQuery. La tabla ya debe existir y tener un esquema válido.
Haz clic en Mostrar parámetros opcionales y, luego, ingresa valores para, al menos, los siguientes parámetros:
- El tema de Kafka desde el que se lee la entrada.
- Es la lista de servidores de arranque de Kafka, separados por comas.
- Un correo electrónico de cuenta de servicio.
Ingresa parámetros adicionales según sea necesario. En particular, es posible que debas especificar lo siguiente:
- Herramientas de redes: Para usar una red de VPC que no sea la predeterminada, especifica la red y la subred.
- UDF: Para usar una UDF de JavaScript, especifica la ubicación de Cloud Storage de la secuencia de comandos y el nombre de la función de JavaScript que se invocará.
gcloud
Para crear el trabajo de Dataflow con Google Cloud CLI, ejecuta el siguiente comando:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters inputTopics=KAFKA_TOPICS \ --parameters bootstrapServers=BOOTSTRAP_SERVERS \ --parameters outputTableSpec=OUTPUT_TABLE \ --parameters serviceAccount=IAM_SERVICE_ACCOUNT \ --parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \ --parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \ --network VPC_NETWORK_NAME \ --subnetwork SUBNET_NAME
Reemplaza las siguientes variables:
- JOB_NAME. Un nombre del trabajo que elijas
- LOCATION. La región en la que se ejecuta el trabajo. Para obtener más información acerca de las regiones y ubicaciones, consulta Ubicaciones de Dataflow.
- KAFKA_TOPICS. Una lista separada por comas de los temas de Kafka que se deben leer.
- BOOTSTRAP_SERVERS. Una lista separada por comas de servidores de arranque de Kafka. Ejemplo:
127:9092,127.0.0.1:9093
. - OUTPUT_TABLE. La tabla de salida de BigQuery, especificada como PROJECT_ID:DATASET_NAME.TABLE_NAME. Ejemplo:
my_project:dataset1.table1
. - IAM_SERVICE_ACCOUNT. Es opcional. La dirección de correo electrónico de la cuenta de servicio que se usará para ejecutar el trabajo.
- UDF_SCRIPT_PATH. Es opcional. La ruta de Cloud Storage a un archivo JavaScript que contiene una UDF. Ejemplo:
gs://your-bucket/your-function.js
. - UDF_FUNCTION_NAME. Es opcional. El nombre de la función de JavaScript que se llamará como la UDF.
- VPC_NETWORK_NAME. Es opcional. La red a la que se asignarán los trabajadores.
- SUBNET_NAME. Es opcional. La subred a la que se asignarán los trabajadores.
Tipos de datos
En esta sección, se describe cómo controlar varios tipos de datos en el esquema de la tabla de BigQuery.
De forma interna, los mensajes JSON se convierten en objetos TableRow
y los valores de los campos TableRow
se traducen a tipos de BigQuery.
Tipos escalares
En el siguiente ejemplo, se crea una tabla de BigQuery con diferentes tipos de datos escalares, incluidos tipos de cadena, numérico, booleano, fecha y hora, intervalo y ubicación geográfica:
CREATE TABLE my_dataset.kafka_events (
string_col STRING,
integer_col INT64,
float_col FLOAT64,
decimal_col DECIMAL,
bool_col BOOL,
date_col DATE,
dt_col DATETIME,
ts_col TIMESTAMP,
interval_col INTERVAL,
geo_col GEOGRAPHY
);
A continuación, se muestra una carga útil de JSON con campos compatibles:
{
"string_col": "string_val",
"integer_col": 10,
"float_col": 3.142,
"decimal_col": 5.2E11,
"bool_col": true,
"date_col": "2022-07-01",
"dt_col": "2022-07-01 12:00:00.00",
"ts_col": "2022-07-01T12:00:00.00Z",
"interval_col": "0-13 370 48:61:61",
"geo_col": "POINT(1 2)"
}
Notas:
- Para una columna
TIMESTAMP
, puedes usar el métodoDate.toJSON
de JavaScript para dar formato al valor. - Para la columna
GEOGRAPHY
, puedes especificar la ubicación geográfica mediante un texto conocido (WKT) o GeoJSON, con formato como una string. Para obtener más información, consulta Carga datos geoespaciales.
Para obtener más información sobre los tipos de datos en BigQuery, consulta Tipos de datos.
Arrays
Puedes almacenar un array en BigQuery con el tipo de datos ARRAY
. En el siguiente ejemplo, la carga útil de JSON contiene una propiedad llamada scores
cuyo valor es un array JSON:
{"name":"Emily","scores":[10,7,10,9]}
La siguiente instrucción de SQL CREATE TABLE
crea una tabla de BigQuery con un esquema compatible:
CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);
La tabla resultante se ve de la siguiente manera:
+-------+-------------+
| name | scores |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+
Estructuras
El tipo de datos STRUCT
en BigQuery contiene una lista ordenada de campos con nombre. Puedes usar un STRUCT
para conservar objetos JSON que sigan un esquema coherente.
En el siguiente ejemplo, la carga útil de JSON contiene una propiedad llamada val
cuyo valor es un objeto JSON:
{"name":"Emily","val":{"a":"yes","b":"no"}}
La siguiente instrucción de SQL CREATE TABLE
crea una tabla de BigQuery con un esquema compatible:
CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);
La tabla resultante se ve de la siguiente manera:
+-------+----------------------+
| name | val |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Datos de eventos semiestructurados
Si los datos de eventos de Kafka no siguen un esquema estricto, considera almacenarlos en BigQuery como un tipo de datos JSON
(versión preliminar). Si almacenas datos JSON como un tipo de datos JSON
, no necesitas definir el esquema del evento por adelantado. Después de la transferencia de datos, puedes consultar la tabla de salida mediante los operadores de acceso a campos (notación de puntos) y arrays.
En primer lugar, crea una tabla con una columna JSON
-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);
Luego, define una UDF de JavaScript que una la carga útil del evento dentro de un objeto JSON:
// UDF
function process(eventData) {
var json;
// TODO Convert the event data to JSON.
return JSON.stringify({ "event_data": json });
}
Una vez que los datos se escriben en BigQuery, puedes consultar los campos individuales con el operador de acceso a campos. Por ejemplo, la siguiente consulta muestra el valor del campo name
para cada registro:
SELECT event_data.name FROM my_dataset1.kafka_events;
Para obtener más información sobre el uso de JSON en BigQuery, consulta Trabaja con datos JSON en SQL estándar de Google.
Errores y registros
Es posible que experimentes errores cuando se ejecuta la canalización o se controlan eventos individuales de Kafka.
Para obtener más información sobre cómo manejar errores de canalización, consulta Solución de problemas y depuración de canalizaciones.
Si el trabajo se ejecuta correctamente, pero se produce un error cuando se procesa un evento individual de Kafka, el trabajo de canalización escribe un registro de error en una tabla de BigQuery. El trabajo en sí no falla, y el error a nivel del evento no aparece como un error en el registro del trabajo de Dataflow.
El trabajo de canalización crea de forma automática la tabla para contener los registros de errores. De forma predeterminada, el nombre de la tabla es "output_table_registros_de_error", en la que output_table es el nombre de la tabla de salida. Por ejemplo, si la tabla de resultados se llama kafka_events
, la tabla de errores se llama kafka_events_error_records
.
Puedes especificar un nombre diferente si estableces el parámetro de plantilla outputDeadletterTable
:
outputDeadletterTable=my_project:dataset1.errors_table
Los errores posibles incluyen los siguientes:
- Errores de serialización, incluido JSON con formato incorrecto.
- Errores de conversión de tipo, causados por una discrepancia en el esquema de la tabla y los datos JSON
- Campos adicionales en los datos JSON que no están presentes en el esquema de la tabla
Ejemplos de mensajes de error:
Tipo de error | Datos de eventos | errorMessage |
---|---|---|
Error de serialización | "Hello world" | No se pudo serializar JSON a la fila de la tabla: "Hello world" |
Error de conversión de tipos | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Campo desconocido | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Próximos pasos
- Más información sobre las plantillas de Dataflow.
- Comienza a usar BigQuery