Plantilla de Apache Kafka a Cloud Storage

La plantilla de Apache Kafka a Cloud Storage es una canalización de transmisión que transfiere datos de texto de Google Cloud Managed Service para Apache Kafka y envía los registros a Cloud Storage.

También puedes usar la plantilla de Apache Kafka a BigQuery con Kafka externa o autoadministrada.

Requisitos de la canalización

  • El bucket de salida de Cloud Storage debe existir.
  • El servidor del agente de Apache Kafka debe estar en ejecución y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.
  • Los temas de Apache Kafka deben existir.

Formato del mensaje de Kafka

La plantilla de Apache Kafka a Cloud Storage admite la lectura de mensajes de Kafka en los siguientes formatos: CONFLUENT_AVRO_WIRE_FORMAT y JSON.

Formato de archivos de salida

El formato de archivo de salida tiene el mismo formato que el mensaje de entrada de Kafka. Por ejemplo, si eliges JSON para el formato de mensaje de Kafka, los archivos JSON se escriben en el bucket de salida de Cloud Storage.

Autenticación

La plantilla de Apache Kafka a Cloud Storage admite la autenticación SASL/PLAIN para los agentes de Kafka.

Parámetros de la plantilla

Parámetros obligatorios

  • readBootstrapServerAndTopic: El tema de Kafka desde el que se lee la entrada.
  • outputDirectory: La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Debe terminar con una barra. (Ejemplo: gs://your-bucket/your-path/).
  • kafkaReadAuthenticationMode : El modo de autenticación que se usará con el clúster de Kafka. Usa NONE para la no autenticación, SASL_PLAIN para nombre de usuario y contraseña de SASL/PLAIN y TLS para autenticación basada en certificados. APPLICATION_DEFAULT_CREDENTIALS solo debe usarse para el clúster de Apache Kafka para BigQuery de Google Cloud, ya que te permite autenticarte con Apache Kafka para BigQuery de Google Cloud con credenciales predeterminadas de la aplicación.
  • messageFormat : El formato de los mensajes de Kafka que se leerán. Los valores admitidos son AVRO_CONFLUENT_WIRE_FORMAT (Avro codificado del Registro de esquema de Confluent), AVRO_BINARY_ENCODING (Avro binario sin formato) y JSON. La configuración predeterminada es: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Si es verdadero, los mensajes con errores se escribirán en BigQuery con información adicional del error. La configuración predeterminada es "false".

Parámetros opcionales

  • windowDuration: La duración/tamaño de la ventana en la que se escribirán los datos en Cloud Storage. Los formatos permitidos son: Ns (para los segundos, por ejemplo, 5 s), Nm (para los minutos, por ejemplo, 12 m) y Nh (para las horas, por ejemplo, 2 h). (Ejemplo: 5 m). La configuración predeterminada es de 5 m.
  • outputFilenamePrefix: El prefijo para colocar en cada archivo con ventanas. (Ejemplo: output-). La configuración predeterminada es “output”.
  • numShards: La cantidad máxima de fragmentos de salida que se produce con la escritura. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage. Dataflow decide el valor predeterminado.
  • enableCommitOffsets : confirma desplazamientos de mensajes procesados en Kafka. Si se habilita, esto minimizará las brechas o el procesamiento duplicado de los mensajes cuando se reinicie la canalización. Requiere especificar el ID del grupo de consumidores. La configuración predeterminada es "false".
  • consumerGroupId : El identificador único del grupo de consumidores al que pertenece esta canalización. Obligatorio si la confirmación de desplazamientos a Kafka está habilitada. La configuración predeterminada es vacía.
  • kafkaReadOffset : El punto de partida para leer mensajes cuando no existen compensaciones confirmadas. El primero comienza desde el principio y el más reciente desde el mensaje más reciente. La configuración predeterminada es: la más reciente.
  • kafkaReadUsernameSecretId : El ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se usará con la autenticación SASL_PLAIN. (Ejemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). El valor predeterminado es vacío.
  • kafkaReadPasswordSecretId : El ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se usará con la autenticación SASL_PLAIN. (Ejemplo: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). El valor predeterminado es vacío.
  • kafkaReadKeystoreLocation : La ruta de acceso de Google Cloud Storage al archivo de Java KeyStore (JKS) que contiene el certificado TLS y la clave privada que se usará para la autenticación con el clúster de Kafka. (Por ejemplo: gs://your-bucket/keystore.jks).
  • kafkaReadTruststoreLocation : La ruta de acceso de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del agente de Kafka.
  • kafkaReadTruststorePasswordSecretId : La ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo del almacén de certificados de confianza de Java (JKS) para la autenticación TLS de Kafka (Example: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versiones/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId : El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. (Ejemplo: proyectos/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeyPasswordSecretId : el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder a la clave privada dentro del archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. (Ejemplo: proyectos/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • schemaFormat : el formato del esquema de Kafka. Se puede proporcionar como SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Si se especifica SINGLE_Schema_FILE, todos los mensajes deben tener el esquema mencionado en el archivo de esquema avro. Si se especifica SCHEMA_REGISTRY, los mensajes pueden tener un solo esquema o varios. La configuración predeterminada es: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath : la ruta de acceso de Google Cloud Storage al único archivo de esquema de Avro que se usa para decodificar todos los mensajes de un tema. La configuración predeterminada es vacía.
  • schemaRegistryConnectionUrl : La URL de la instancia del registro de esquemas de Confluent que se usa para administrar esquemas de Avro para la decodificación de mensajes. La configuración predeterminada es vacía.
  • binaryAvroSchemaPath : Es la ruta de acceso de Google Cloud Storage al archivo de esquema de Avro que se usa para decodificar mensajes de Avro con codificación binaria. La configuración predeterminada es vacía.
  • schemaRegistryAuthenticationMode : Es el modo de autenticación de Schema Registry. Puede ser NONE, TLS o OAUTH. La configuración predeterminada es NONE.
  • schemaRegistryTruststoreLocation : Es la ubicación del certificado SSL en la que se almacena el almacén de confianza para la autenticación en Schema Registry. (Por ejemplo: /your-bucket/truststore.jks).
  • schemaRegistryTruststorePasswordSecretId : Es el SecretId en Secret Manager donde se almacena la contraseña para acceder al secreto en el almacén de confianza. (Ejemplo: proyectos/id-de-tu-proyecto/secrets/nombre-de-tu-secreto/versiones/versión-de-tu-secreto).
  • schemaRegistryKeystoreLocation : Es la ubicación del almacén de claves que contiene el certificado SSL y la clave privada. (Por ejemplo: /your-bucket/keystore.jks).
  • schemaRegistryKeystorePasswordSecretId: Es el ID de secreto en Secret Manager donde se encuentra la contraseña para acceder al archivo del almacén de claves (por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryKeyPasswordSecretId: Es el SecretId de la contraseña necesaria para acceder a la clave privada del cliente almacenada en el almacén de claves (por ejemplo, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryOauthClientId : Es el ID de cliente que se usa para autenticar el cliente de Schema Registry en modo OAUTH. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId : El ID del secreto de Google Cloud Secret Manager que contiene el secreto de cliente que se usará para autenticar el cliente de Schema Registry en modo OAUTH. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT. (Ejemplo: proyectos/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • schemaRegistryOauthScope : Es el alcance del token de acceso que se usa para autenticar el cliente de Schema Registry en modo OAUTH. Este campo es opcional, ya que la solicitud se puede realizar sin pasar un parámetro de alcance. (por ejemplo, openid).
  • schemaRegistryOauthTokenEndpointUrl : Es la URL basada en HTTP(S) del proveedor de identidad de OAuth/OIDC que se usa para autenticar el cliente de Schema Registry en modo OAUTH. Obligatorio para el formato de mensaje AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable : Es el nombre de la tabla de BigQuery completamente calificado para los mensajes con errores. Los mensajes que no llegaron a la tabla de resultados por diferentes motivos (p.ej., un esquema no coincidente, un archivo JSON con formato incorrecto) se escriben en esta tabla.La plantilla creará la tabla. (Ejemplo: your-project-id:your-dataset.your-table-name).

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de Cloud Storage
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, debes escapar las comas. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, debes escapar las comas. Consulta gcloud topic escaping.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de Cloud Storage
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, debes escapar las comas. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, debes escapar las comas. Consulta gcloud topic escaping.

Para obtener más información, consulta Escribe datos de Kafka en Cloud Storage con Dataflow.

¿Qué sigue?