Plantilla de Apache Kafka a la planilla de BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de transmisión que transfiere datos de texto de Apache Kafka, ejecuta una función definida por el usuario (UDF) y envía los registros resultantes a BigQuery. Cualquier error que ocurra en la transformación de los datos, la ejecución de la UDF o la inserción en la tabla de resultados se inserta en una tabla de errores independiente en BigQuery. Si la tabla de errores no existe antes de la ejecución, se creará.

Requisitos de la canalización

  • La tabla de salida de BigQuery debe existir.
  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir, y los mensajes deben estar codificados en un formato JSON válido.

Parámetros de la plantilla

Parámetros obligatorios

  • outputTableSpec: la ubicación de la tabla de BigQuery en la que se escribirá el resultado. El nombre debe tener el formato <project>:<dataset>.<table_name>. El esquema de la tabla debe coincidir con los objetos de entrada.

Parámetros opcionales

  • bootstrapServers: Lista del servidor de arranque de Kafka, separada por comas. (Ejemplo: localhost:9092,127.0.0.1:9093).
  • inputTopics: Temas de Kafka desde los que se lee la entrada. (Ejemplo: topic1,topic2).
  • outputDeadletterTable: la tabla de BigQuery para los mensajes con errores. Los mensajes que no llegaron a la tabla de resultados por diferentes motivos (p. ej., un esquema no coincidente, un archivo JSON con formato incorrecto) se escriben en esta tabla. Si no existe, se creará durante la ejecución de la canalización. Si no se especifica, se usa “outputTableSpec_error_records” en su lugar. (Ejemplo: your-project-id:your-dataset.your-table-name).
  • messageFormat: El formato del mensaje. Puede ser AVRO o JSON. La configuración predeterminada es JSON.
  • avroSchemaPath: La ruta de Cloud Storage al archivo de esquema de Avro. Por ejemplo, gs://MyBucket/file.avsc.
  • useStorageWriteApiAtLeastOnce: Este parámetro solo se aplica si “Usar la API de BigQuery Storage Write” está habilitada. Si se habilita, se usará la semántica de “al menos una vez” para la API de Storage Write; de lo contrario, se usará la semántica de “exactamente una vez”. La configuración predeterminada es "false".
  • readBootstrapServers: La lista del servidor de arranque de Kafka, separada por comas. (Ejemplo: localhost:9092,127.0.0.1:9093).
  • kafkaReadTopics: Temas de Kafka desde los que se leerá la entrada. (Ejemplo: topic1,topic2).
  • javascriptTextTransformGcsPath: el patrón de ruta de acceso de Cloud Storage para el código JavaScript que contiene las funciones definidas por el usuario. (Por ejemplo: gs://your-bucket/your-function.js).
  • javascriptTextTransformFunctionName : El nombre de la función a la que se llamará desde el archivo JavaScript. Usa solo letras, dígitos y guiones bajos. (Ejemplo: “transform” o “transform_udf1”).
  • javascriptTextTransformReloadIntervalMinutes: Define el intervalo que los trabajadores pueden verificar para detectar cambios en la UDF de JavaScript a fin de volver a cargar los archivos. La configuración predeterminada es 0.
  • writeDisposition: BigQuery WriteDisposition. Por ejemplo, WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. La configuración predeterminada es: WRITE_APPEND.
  • createDisposition: CreateDisposition de BigQuery. Por ejemplo, CREATE_IF_NEEDED, CREATE_NEVER. La configuración predeterminada es CREATE_IF_NEEDED.
  • useStorageWriteApi: si es verdadero, la canalización usa la API de Storage Write cuando escribe los datos en BigQuery (consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). El valor predeterminado es falso. Cuando usas la API de Storage Write en modo “exactamente una vez”, debes establecer los siguientes parámetros: “Cantidad de transmisiones para la API de BigQuery Storage Write” y “Frecuencia de activación en segundos para la API de BigQuery Storage Write”. Si habilitas el modo de al menos una vez de Dataflow o configuras el parámetro useStorageWriteApiAtLeastOnce como verdadero, no es necesario que establezcas la cantidad de transmisiones ni la frecuencia de activación.
  • numStorageWriteApiStreams: la cantidad de transmisiones define el paralelismo de la transformación de escritura de BigQueryIO y corresponde aproximadamente a la cantidad de transmisiones de la API de Storage Write que usará la canalización. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec : la frecuencia de activación determinará qué tan pronto serán visibles los datos para las consultas en BigQuery. Consulta https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api para conocer los valores recomendados.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: el valor del registro de Kafka, serializado como una cadena JSON.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona At Least Once.
  8. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de BigQuery.
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, sigue las instrucciones para escapar las comas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, sigue las instrucciones para escapar las comas.

Para obtener más información, consulta Escribe datos de Kafka en BigQuery con Dataflow.

¿Qué sigue?