La plantilla de Apache Kafka a Apache Kafka crea un flujo de procesamiento en streaming que ingiere datos como bytes de una fuente de Apache Kafka y, a continuación, escribe los bytes en un receptor de Apache Kafka.
Requisitos del flujo de procesamiento
- El tema de origen de Apache Kafka debe existir.
- Los servidores de los brokers de origen y de receptor de Apache Kafka deben estar en funcionamiento y se debe poder acceder a ellos desde las máquinas de trabajador de Dataflow.
- Si usas Google Cloud Managed Service para Apache Kafka como origen o como receptor, el tema debe existir antes de iniciar la plantilla.
Formato de mensaje de Kafka
Los mensajes de origen de Apache Kafka se leen como bytes y los bytes se escriben en el receptor de Apache Kafka.
Autenticación
La plantilla de Apache Kafka a Apache Kafka admite la autenticación SASL/PLAIN y TLS en los brokers de Kafka.
Parámetros de plantilla
Parámetros obligatorios
- readBootstrapServerAndTopic servidor de arranque y tema de Kafka para leer la entrada. Por ejemplo,
localhost:9092;topic1,topic2
. - kafkaReadAuthenticationMode el modo de autenticación que se va a usar con el clúster de Kafka. Usa
KafkaAuthenticationMethod.NONE
para no usar autenticación,KafkaAuthenticationMethod.SASL_PLAIN
para usar el nombre de usuario y la contraseña de SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
para usar la autenticación SASL_SCRAM_512 yKafkaAuthenticationMethod.TLS
para usar la autenticación basada en certificados.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
solo se debe usar en el clúster de Google Cloud Apache Kafka para BigQuery. Permite autenticarte mediante las credenciales predeterminadas de la aplicación. - writeBootstrapServerAndTopic: tema de Kafka en el que se escribirá la salida.
- kafkaWriteAuthenticationMethod el modo de autenticación que se va a usar con el clúster de Kafka. Usa NONE si no quieres usar autenticación, SASL_PLAIN para el nombre de usuario y la contraseña de SASL/PLAIN, SASL_SCRAM_512 para la autenticación basada en SASL_SCRAM_512 y TLS para la autenticación basada en certificados. El valor predeterminado es APPLICATION_DEFAULT_CREDENTIALS.
Parámetros opcionales
- enableCommitOffsets confirma los desplazamientos de los mensajes procesados en Kafka. Si se habilita, se minimizarán los huecos o el procesamiento duplicado de mensajes al reiniciar la canalización. Requiere que se especifique el ID de grupo de consumidores. Valor predeterminado: false.
- consumerGroupId identificador único del grupo de consumidores al que pertenece esta canalización. Obligatorio si la opción Confirmar desfases en Kafka está habilitada. El valor predeterminado es una cadena vacía.
- kafkaReadOffset el punto de partida para leer mensajes cuando no hay offsets confirmados. La más antigua empieza desde el principio y la más reciente, desde el mensaje más reciente. El valor predeterminado es latest.
- kafkaReadUsernameSecretId el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se va a usar con la autenticación
SASL_PLAIN
. Por ejemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. El valor predeterminado es una cadena vacía. - kafkaReadPasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se va a usar con la autenticación
SASL_PLAIN
. Por ejemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. El valor predeterminado es una cadena vacía. - kafkaReadKeystoreLocation ruta de Google Cloud Storage al archivo Java KeyStore (JKS) que contiene el certificado TLS y la clave privada que se usarán para autenticarte con el clúster de Kafka. Por ejemplo,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation la ruta de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del broker de Kafka.
- kafkaReadTruststorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java TrustStore (JKS) para la autenticación TLS de Kafka. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId el ID de secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder a la clave privada del archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId el ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se va a usar con la autenticación
SASL_SCRAM
. Por ejemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId el ID del secreto de Secret Manager de Google Cloud que contiene la contraseña de Kafka que se va a usar con la autenticación
SASL_SCRAM
. Por ejemplo,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation la ruta de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del broker de Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java TrustStore (JKS) para la autenticación SASL_SCRAM de Kafka. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteUsernameSecretId el ID secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka para la autenticación SASL_PLAIN con el clúster de Kafka de destino. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. El valor predeterminado es una cadena vacía. - kafkaWritePasswordSecretId el ID secreto de Secret Manager de Google Cloud que contiene la contraseña de Kafka que se va a usar para la autenticación SASL_PLAIN con el clúster de Kafka de destino. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. El valor predeterminado es una cadena vacía. - kafkaWriteKeystoreLocation la ruta de Google Cloud Storage al archivo de almacén de claves de Java (JKS) que contiene el certificado TLS y la clave privada para autenticarte en el clúster de Kafka de destino. Por ejemplo,
gs://<BUCKET>/<KEYSTORE>.jks
. - kafkaWriteTruststoreLocation la ruta de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del broker de Kafka de destino.
- kafkaWriteTruststorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usa para acceder al archivo Java TrustStore (JKS) para la autenticación TLS con el clúster de Kafka de destino. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeystorePasswordSecretId el ID del secreto de Google Cloud Secret Manager que contiene la contraseña para acceder al archivo Java KeyStore (JKS) que se va a usar para la autenticación TLS con el clúster de Kafka de destino. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaWriteKeyPasswordSecretId el ID de secreto de Google Cloud Secret Manager que contiene la contraseña que se debe usar para acceder a la clave privada del archivo de almacén de claves de Java (JKS) para la autenticación TLS con el clúster de Kafka de destino. Por ejemplo,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
.
Ejecutar la plantilla
Consola
- Ve a la página Crear tarea a partir de plantilla de Dataflow. Ir a Crear tarea a partir de plantilla
- En el campo Nombre de la tarea, introduce un nombre único.
- Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es
us-central1
.Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de flujo de datos, seleccione the Kafka to Cloud Storage template.
- En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
- Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
- Haz clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasREGION_NAME
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: la dirección del servidor de arranque de Apache Kafka y el tema del que se va a leerWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: la dirección del servidor de arranque de Apache Kafka y el tema en el que escribirEl formato de la dirección del servidor de arranque y del tema depende del tipo de clúster:
- Clúster de Managed Service para Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Clúster de Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Clúster de Managed Service para Apache Kafka:
API
Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Haz los cambios siguientes:
PROJECT_ID
: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de DataflowJOB_NAME
: un nombre de trabajo único que elijasLOCATION
: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo,us-central1
VERSION
: la versión de la plantilla que quieres usarPuedes usar los siguientes valores:
latest
para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/- el nombre de la versión, como
2023-09-12-00_RC00
, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC
: la dirección del servidor de arranque de Apache Kafka y el tema del que se va a leerWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: la dirección del servidor de arranque de Apache Kafka y el tema en el que escribirEl formato de la dirección del servidor de arranque y del tema depende del tipo de clúster:
- Clúster de Managed Service para Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Clúster de Kafka externo:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Clúster de Managed Service para Apache Kafka:
Siguientes pasos
- Consulta información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas proporcionadas por Google.