En esta página se describe cómo usar Google Cloud Managed Service para Apache Kafka como origen o receptor en una canalización de Dataflow.
Puedes usar cualquiera de los siguientes métodos:
Requisitos
Habilita las APIs Cloud Storage, Dataflow y Managed Service para Apache Kafka en tu proyecto. Consulta Habilitar APIs o ejecuta el siguiente comando de la CLI de Google Cloud:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
La cuenta de servicio de trabajador de Dataflow debe tener el rol de gestión de identidades y accesos (IAM) de cliente de Kafka gestionado (
roles/managedkafka.client
).Las máquinas virtuales de trabajador de Dataflow deben tener acceso de red al servidor de arranque de Kafka. Para obtener más información, consulta el artículo sobre cómo configurar la red de Managed Service para Apache Kafka.
Obtener la dirección del servidor de arranque
Para ejecutar una canalización que se conecte a un clúster de Managed Service para Apache Kafka, primero obtén la dirección del servidor de arranque del clúster. Necesitarás esta dirección al configurar la canalización.
Puedes usar la Google Cloud consola o la CLI de Google Cloud, como se indica a continuación:
Consola
En la Google Cloud consola, ve a la página Clusters.
Haz clic en el nombre del clúster.
Haz clic en la pestaña Configuraciones.
Copia la dirección del servidor de arranque de URL de arranque.
gcloud
Usa el comando managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Haz los cambios siguientes:
- CLUSTER_ID: el ID o el nombre del clúster
- LOCATION: la ubicación del clúster
Para obtener más información, consulta Ver un clúster de Managed Service para Apache Kafka.
Usar Managed Service para Apache Kafka con una plantilla de Dataflow
Google proporciona varias plantillas de Dataflow que leen datos de Apache Kafka:
Estas plantillas se pueden usar con Managed Service para Apache Kafka. Si uno de ellos se ajusta a tu caso práctico, te recomendamos que lo uses en lugar de escribir código de canalización personalizado.
Consola
Ve a la página Dataflow > Tareas.
Haz clic en Crear tarea a partir de plantilla.
En Nombre del trabajo, escribe el nombre del trabajo.
En el menú desplegable de plantillas Dataflow, selecciona la plantilla que quieras ejecutar.
En el cuadro Servidor de arranque de Kafka, introduce la dirección del servidor de arranque.
En el cuadro Tema de Kafka, escribe el nombre del tema.
En Kafka authentication mode (Modo de autenticación de Kafka), selecciona APPLICATION_DEFAULT_CREDENTIALS (Credenciales predeterminadas de la aplicación).
En Formato de mensaje de Kafka, selecciona el formato de los mensajes de Apache Kafka.
Introduce otros parámetros según sea necesario. Los parámetros admitidos se documentan en cada plantilla.
Ejecutar tarea.
gcloud
Usa el comando
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Haz los cambios siguientes:
- JOB_NAME: el nombre del trabajo
- TEMPLATE_FILE: la ubicación del archivo de plantilla en Cloud Storage
- REGION_NAME: la región en la que quieres desplegar el trabajo
- PROJECT_NAME: el nombre de tu proyecto de Google Cloud Platform
- LOCATION: la ubicación del clúster
- CLUSTER_ID: el ID o el nombre del clúster
- TOPIC: el nombre del tema de Kafka
Usar Managed Service para Apache Kafka con una canalización de Beam
En esta sección se describe cómo usar el SDK de Apache Beam para crear y ejecutar un flujo de procesamiento de Dataflow que se conecte a Managed Service para Apache Kafka.
En la mayoría de los casos, usa la
transformación de entrada/salida gestionada como origen o receptor de Kafka. Si necesitas un ajuste de rendimiento más avanzado, te recomendamos que uses el conector KafkaIO
.
Para obtener más información sobre las ventajas de usar E/S gestionadas, consulta E/S gestionadas de Dataflow.
Requisitos
Versión 3.6.0 o posterior de Kafka Client.
La versión 2.61.0 (o una posterior) del SDK de Apache Beam.
La máquina en la que inicies la tarea de Dataflow debe tener acceso de red al servidor de arranque de Apache Kafka. Por ejemplo, inicia el trabajo desde una instancia de Compute Engine que pueda acceder a la VPC en la que se pueda acceder al clúster.
La cuenta principal que crea el trabajo debe tener los siguientes roles de gestión de identidades y accesos:
- Cliente de Kafka gestionado (
roles/managedkafka.client
) para acceder al clúster de Apache Kafka. - Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) para actuar como cuenta de servicio de trabajador de Dataflow. - Administrador de almacenamiento (
roles/storage.admin
) para subir archivos de trabajo a Cloud Storage. - Administrador de Dataflow (
roles/dataflow.admin
) para crear la tarea.
Si inicias el trabajo desde una instancia de Compute Engine, puedes asignar estos roles a una cuenta de servicio que esté asociada a la máquina virtual. Para obtener más información, consulta Crear una VM que use una cuenta de servicio gestionada por el usuario.
También puedes usar las credenciales predeterminadas de la aplicación (ADC) con la suplantación de identidad de la cuenta de servicio al crear el trabajo.
- Cliente de Kafka gestionado (
Configurar E/S gestionada
Si tu canalización usa Managed I/O para Apache Kafka, define las siguientes opciones de configuración para autenticarte con Managed Service para Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
En los siguientes ejemplos se muestra cómo configurar la E/S gestionada para Managed Service para Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configurar el conector KafkaIO
En los siguientes ejemplos se muestra cómo configurar el conector KafkaIO
para Managed Service para Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
Siguientes pasos
- Consulta más información sobre Managed Service para Apache Kafka.
- Escribe datos de Managed Service para Apache Kafka en BigQuery.
- Leer de Apache Kafka a Dataflow.
- Escribir datos de Dataflow en Apache Kafka.