Plantilla de Apache Kafka a BigQuery

La plantilla de Apache Kafka a BigQuery es una canalización de streaming que ingiere datos de texto de clústeres del servicio gestionado de Google Cloud para Apache Kafka y, a continuación, envía los registros resultantes a tablas de BigQuery. Los errores que se produzcan al insertar datos en la tabla de salida se insertarán en una tabla de errores independiente en BigQuery.

También puedes usar la plantilla de Apache Kafka a BigQuery con Kafka autogestionado o externo.

Requisitos del flujo de procesamiento

  • El servidor de broker de Apache Kafka debe estar en funcionamiento y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir.
  • Debes habilitar las APIs Dataflow, BigQuery y Cloud Storage. Si se requiere autenticación, también debes habilitar la API Secret Manager.
  • Crea un conjunto de datos y una tabla de BigQuery con el esquema adecuado para tu tema de entrada de Kafka. Si usas varios esquemas en el mismo tema y quieres escribir en varias tablas, no es necesario que crees la tabla antes de configurar la canalización.
  • Cuando se habilita la cola de mensajes fallidos (mensajes no procesados) de la plantilla, crea una tabla vacía que no tenga un esquema para la cola de mensajes fallidos.
  • Si te conectas a un clúster de Managed Service para Apache Kafka, la canalización también debe cumplir los requisitos que se indican en Usar Dataflow con Managed Service para Apache Kafka.

Formato de mensaje de Kafka

Esta plantilla admite la lectura de mensajes de Kafka en los siguientes formatos:

Formato JSON

Para leer mensajes JSON, asigna el valor "JSON" al parámetro de plantilla messageFormat.

Codificación binaria Avro

Para leer mensajes Avro binarios, define los siguientes parámetros de plantilla:

  • messageFormat: "AVRO_BINARY_ENCODING".
  • binaryAvroSchemaPath: la ubicación de un archivo de esquema Avro en Cloud Storage. Ejemplo: gs://BUCKET_NAME/message-schema.avsc

Para obtener más información sobre el formato binario Avro, consulta Codificación binaria en la documentación de Apache Avro.

Avro codificado con Confluent Schema Registry

Para leer mensajes en Avro codificados en Confluent Schema Registry, define los siguientes parámetros de plantilla:

  • messageFormat: "AVRO_CONFLUENT_WIRE_FORMAT".

  • schemaFormat: puede ser uno de los siguientes valores:
    • "SINGLE_SCHEMA_FILE": el esquema del mensaje se define en un archivo de esquema Avro. Especifica la ubicación del archivo de esquema en Cloud Storage en el parámetro confluentAvroSchemaPath.
    • "SCHEMA_REGISTRY": los mensajes se codifican mediante Confluent Schema Registry. Especifique la URL de la instancia de Confluent Schema Registry en el parámetro schemaRegistryConnectionUrl y el modo de autenticación en el parámetro schemaRegistryAuthenticationMode.

Para obtener más información sobre este formato, consulta Wire format (Formato de cable) en la documentación de Confluent.

Autenticación

La plantilla de Apache Kafka a BigQuery admite la autenticación SASL/PLAIN en los brokers de Kafka.

Parámetros de plantilla

Parámetros obligatorios

  • readBootstrapServerAndTopic tema de Kafka del que se leerán los datos de entrada.
  • writeMode escribe registros en una o varias tablas (en función del esquema). El modo DYNAMIC_TABLE_NAMES solo se admite en los formatos AVRO_CONFLUENT_WIRE_FORMAT Mensaje de origen y SCHEMA_REGISTRY Esquema de origen. El nombre de la tabla de destino se genera automáticamente en función del nombre del esquema Avro de cada mensaje. Puede ser un solo esquema (que crea una sola tabla) o varios esquemas (que crean varias tablas). El modo SINGLE_TABLE_NAME escribe en una sola tabla (un solo esquema) especificada por el usuario. El valor predeterminado es SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode el modo de autenticación que se va a usar con el clúster de Kafka. Usa KafkaAuthenticationMethod.NONE para no usar autenticación, KafkaAuthenticationMethod.SASL_PLAIN para usar el nombre de usuario y la contraseña de SASL/PLAIN, KafkaAuthenticationMethod.SASL_SCRAM_512 para usar la autenticación SASL_SCRAM_512 y KafkaAuthenticationMethod.TLS para usar la autenticación basada en certificados. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS solo se debe usar en el clúster de Google Cloud Apache Kafka para BigQuery. Permite autenticarte mediante las credenciales predeterminadas de la aplicación.
  • messageFormat: formato de los mensajes de Kafka que se van a leer. Los valores admitidos son AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado en Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binario sin formato) y JSON. Valor predeterminado: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ si es true, los mensajes fallidos se escribirán en BigQuery con información adicional sobre el error. Valor predeterminado: false.

Parámetros opcionales

  • outputTableSpec ubicación de la tabla de BigQuery en la que se escribirán los resultados. El nombre debe tener el formato <project>:<dataset>.<table_name>. El esquema de la tabla debe coincidir con los objetos de entrada.
  • persistKafkaKey si es true, la canalización conservará la clave del mensaje de Kafka en la tabla de BigQuery, en un campo _key de tipo BYTES. El valor predeterminado es false (se ignora la clave).
  • outputProject proyecto de BigQuery de salida en el que reside el conjunto de datos. Las tablas se crearán de forma dinámica en el conjunto de datos. El valor predeterminado es una cadena vacía.
  • outputDataset conjunto de datos de BigQuery de salida en el que se escribirán los resultados. Las tablas se crearán de forma dinámica en el conjunto de datos. Si las tablas se crean de antemano, sus nombres deben seguir la convención de nomenclatura especificada. El nombre debe ser bqTableNamePrefix + Avro Schema FullName y cada palabra debe estar separada por un guion -. El valor predeterminado es una cadena vacía.
  • bqTableNamePrefix prefijo de nomenclatura que se usará al crear tablas de salida de BigQuery. Solo se aplica cuando se usa el registro de esquemas. El valor predeterminado es una cadena vacía.
  • createDisposition CreateDisposition de BigQuery. Por ejemplo: CREATE_IF_NEEDED, CREATE_NEVER. El valor predeterminado es CREATE_IF_NEEDED.
  • writeDisposition WriteDisposition de BigQuery. Por ejemplo: WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. El valor predeterminado es WRITE_APPEND.
  • useAutoSharding si es true, la canalización usa la fragmentación automática al escribir en BigQuery. El valor predeterminado es true.
  • numStorageWriteApiStreams especifica el número de flujos de escritura. Este parámetro es obligatorio. El valor predeterminado es 0.
  • storageWriteApiTriggeringFrequencySec especifica la frecuencia de activación en segundos. Este parámetro debe definirse. El valor predeterminado es 5 segundos.
  • useStorageWriteApiAtLeastOnce este parámetro solo tiene efecto si la opción "Usar la API Storage Write de BigQuery" está habilitada. Si está habilitada, se usará la semántica "al menos una vez" para la API Storage Write. De lo contrario, se usará la semántica "exactamente una vez". Valor predeterminado: false.
  • enableCommitOffsets confirma los desplazamientos de los mensajes procesados en Kafka. Si se habilita, se minimizarán los huecos o el procesamiento duplicado de mensajes al reiniciar la canalización. Requiere que se especifique el ID de grupo de consumidores. Valor predeterminado: false.
  • consumerGroupId identificador único del grupo de consumidores al que pertenece esta canalización. Obligatorio si la opción Confirmar desfases en Kafka está habilitada. El valor predeterminado es una cadena vacía.
  • kafkaReadOffset el punto de partida para leer mensajes cuando no hay offsets confirmados. La más antigua empieza desde el principio y la más reciente, desde el mensaje más reciente. El valor predeterminado es latest.
  • kafkaReadUsernameSecretId el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se va a usar con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. El valor predeterminado es una cadena vacía.
  • kafkaReadPasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se va a usar con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. El valor predeterminado es una cadena vacía.
  • kafkaReadKeystoreLocation ruta de Google Cloud Storage al archivo Java KeyStore (JKS) que contiene el certificado TLS y la clave privada que se usarán para autenticarte con el clúster de Kafka. Por ejemplo, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation la ruta de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del broker de Kafka.
  • kafkaReadTruststorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java TrustStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId el ID de secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder a la clave privada del archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramUsernameSecretId el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se va a usar con la autenticación SASL_SCRAM. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramPasswordSecretId el ID del secreto de Secret Manager de Google Cloud que contiene la contraseña de Kafka que se va a usar con la autenticación SASL_SCRAM. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadSaslScramTruststoreLocation la ruta de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del broker de Kafka.
  • kafkaReadSaslScramTruststorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java TrustStore (JKS) para la autenticación SASL_SCRAM de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: el formato del esquema de Kafka. Se puede proporcionar como SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Si se especifica SINGLE_SCHEMA_FILE, usa el esquema mencionado en el archivo de esquema de Avro para todos los mensajes. Si se especifica SCHEMA_REGISTRY, los mensajes pueden tener un solo esquema o varios. Valor predeterminado: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath la ruta de Google Cloud Storage al archivo de esquema de Avro único que se usa para decodificar todos los mensajes de un tema. El valor predeterminado es una cadena vacía.
  • schemaRegistryConnectionUrl la URL de la instancia de Confluent Schema Registry que se usa para gestionar los esquemas de Avro para decodificar mensajes. El valor predeterminado es una cadena vacía.
  • binaryAvroSchemaPath: la ruta de Google Cloud Storage al archivo de esquema de Avro que se usa para decodificar mensajes de Avro codificados en formato binario. El valor predeterminado es una cadena vacía.
  • schemaRegistryAuthenticationMode modo de autenticación del registro de esquemas. Puede ser NONE, TLS u OAUTH. El valor predeterminado es NONE.
  • schemaRegistryTruststoreLocation ubicación del certificado SSL en el que se almacena el almacén de confianza para la autenticación en Schema Registry. Por ejemplo, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId en Secret Manager donde se almacena la contraseña para acceder al secreto en el almacén de confianza. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation ubicación del almacén de claves que contiene el certificado SSL y la clave privada. Por ejemplo, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId SecretId en Secret Manager donde se encuentra la contraseña para acceder al archivo del almacén de claves. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId SecretId de la contraseña necesaria para acceder a la clave privada del cliente almacenada en el almacén de claves. Por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId ID de cliente usado para autenticar el cliente de Schema Registry en el modo OAuth. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId el ID de secreto de Google Cloud Secret Manager que contiene el secreto de cliente que se usará para autenticar el cliente de Schema Registry en el modo OAUTH. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope el ámbito del token de acceso que se usa para autenticar el cliente de Schema Registry en el modo OAUTH. Este campo es opcional, ya que la solicitud se puede hacer sin pasar un parámetro de ámbito. Por ejemplo, openid.
  • schemaRegistryOauthTokenEndpointUrl la URL basada en HTTP(S) del proveedor de identidades de OAuth u OIDC que se usa para autenticar el cliente de Schema Registry en el modo OAUTH. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable nombre de tabla de BigQuery completo de los mensajes fallidos. Los mensajes que no se han podido enviar a la tabla de salida por diferentes motivos (por ejemplo, un esquema que no coincide o un JSON con formato incorrecto) se escriben en esta tabla. La plantilla creará la tabla. Por ejemplo, your-project-id:your-dataset.your-table-name.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es 0.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

La plantilla solo admite UDFs para mensajes de Kafka con formato JSON. Si los mensajes de Kafka usan el formato Avro, no se invoca la función definida por el usuario.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: el valor del registro de Kafka, serializado como una cadena JSON
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Kafka to BigQuery template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME,\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • BOOTSTRAP_SERVER_AND_TOPIC: la dirección del servidor de arranque y el tema de Apache Kafka

    El formato de la dirección del servidor de arranque y del tema depende del tipo de clúster:

    • Clúster de Managed Service para Apache Kafka: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Clúster de Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: el nombre de tu conjunto de datos de BigQuery
  • TABLE_NAME: el nombre de la tabla de salida de BigQuery
  • ERROR_TABLE_NAME: nombre de la tabla de BigQuery en la que se escribirán los registros de errores.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "messageFormat": "JSON",
          "writeMode": "SINGLE_TABLE_NAME",
          "outputTableSpec": "PROJECT_ID:DATASET_NAME.TABLE_NAME",
          "useBigQueryDLQ": "true",
          "outputDeadletterTable": "PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • BOOTSTRAP_SERVER_AND_TOPIC: la dirección del servidor de arranque y el tema de Apache Kafka

    El formato de la dirección del servidor de arranque y del tema depende del tipo de clúster:

    • Clúster de Managed Service para Apache Kafka: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • Clúster de Kafka externo: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
  • DATASET_NAME: el nombre de tu conjunto de datos de BigQuery
  • TABLE_NAME: el nombre de la tabla de salida de BigQuery
  • ERROR_TABLE_NAME: nombre de la tabla de BigQuery en la que se escribirán los registros de errores.

Para obtener más información, consulta el artículo sobre cómo escribir datos de Kafka en BigQuery con Dataflow.

Siguientes pasos