En esta página, se describe cómo usar Google Cloud Managed Service para Apache Kafka como fuente o receptor en una canalización de Dataflow.
Puedes usar cualquiera de los siguientes enfoques:
Requisitos
Habilita las APIs de Cloud Storage, Dataflow y Managed Service for Apache Kafka en tu proyecto. Consulta Habilita APIs o ejecuta el siguiente comando de Google Cloud CLI:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
La cuenta de servicio de trabajador de Dataflow debe tener el rol de Identity and Access Management (IAM) de cliente administrado de Kafka (
roles/managedkafka.client
).Las VMs de trabajador de Dataflow deben tener acceso de red al servidor de arranque de Kafka. Para obtener más información, consulta Configura las redes del Servicio administrado para Apache Kafka.
Obtén la dirección del servidor de arranque
Para ejecutar una canalización que se conecta a un clúster de Servicio administrado para Apache Kafka, primero obtén la dirección del servidor de arranque del clúster. Necesitarás esta dirección cuando configures la canalización.
Puedes usar la Google Cloud consola o Google Cloud CLI de la siguiente manera:
Console
En la consola de Google Cloud , ve a la página Clústeres.
Haz clic en el nombre del clúster.
Haz clic en la pestaña Configurations.
Copia la dirección del servidor de arranque de Bootstrap URL.
gcloud
Usa el comando managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Reemplaza lo siguiente:
- CLUSTER_ID: Es el ID o el nombre del clúster.
- LOCATION: Es la ubicación del clúster.
Para obtener más información, consulta Cómo ver un clúster de Managed Service para Apache Kafka.
Usa Managed Service para Apache Kafka con una plantilla de Dataflow
Google proporciona varias plantillas de Dataflow que leen desde Apache Kafka:
Estas plantillas se pueden usar con Managed Service para Apache Kafka. Si uno de ellos coincide con tu caso de uso, considera usarlo en lugar de escribir código de canalización personalizado.
Console
Ve a la página Trabajos de Dataflow >.
Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
En Nombre del trabajo, ingresa un nombre para el trabajo.
En el menú desplegable de plantillas de Dataflow, selecciona la plantilla que deseas ejecutar.
En el cuadro Servidor de arranque de Kafka, ingresa la dirección del servidor de arranque.
En el cuadro Tema de Kafka, ingresa el nombre del tema.
En Modo de autenticación de Kafka, selecciona APPLICATION_DEFAULT_CREDENTIALS.
En Formato del mensaje de Kafka, selecciona el formato de los mensajes de Apache Kafka.
Ingresa otros parámetros según sea necesario. Los parámetros admitidos se documentan para cada plantilla.
Ejecutar trabajo
gcloud
Usa el comando gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Reemplaza lo siguiente:
- JOB_NAME: Un nombre para el trabajo
- TEMPLATE_FILE: La ubicación del archivo de plantilla en Cloud Storage
- REGION_NAME: la región en la que deseas implementar tu trabajo
- PROJECT_NAME: Nombre de tu proyecto Google Cloud
- LOCATION: Es la ubicación del clúster.
- CLUSTER_ID: Es el ID o el nombre del clúster.
- TOPIC: Es el nombre del tema de Kafka.
Usa Managed Service para Apache Kafka con una canalización de Beam
En esta sección, se describe cómo usar el SDK de Apache Beam para crear y ejecutar una canalización de Dataflow que se conecta a Managed Service for Apache Kafka.
En la mayoría de los casos, usa la transformación de E/S administrada como tu fuente o receptor de Kafka. Si necesitas un ajuste del rendimiento más avanzado, considera usar el conector KafkaIO
.
Para obtener más información sobre los beneficios de usar E/S administrada, consulta E/S administrada de Dataflow.
Requisitos
Versión 3.6.0 o posterior del cliente de Kafka
SDK de Apache Beam, versión 2.61.0 o posterior
La máquina en la que inicias el trabajo de Dataflow debe tener acceso a la red del servidor de arranque de Apache Kafka. Por ejemplo, inicia el trabajo desde una instancia de Compute Engine que pueda acceder a la VPC en la que se puede acceder al clúster.
La principal que crea el trabajo debe tener los siguientes roles de IAM:
- Cliente de Kafka administrado (
roles/managedkafka.client
) para acceder al clúster de Apache Kafka. - Usuario de la cuenta de servicio (
roles/iam.serviceAccountUser
) para actuar como la cuenta de servicio de trabajador de Dataflow - Administrador de almacenamiento (
roles/storage.admin
) para subir archivos de trabajo a Cloud Storage - Administrador de Dataflow (
roles/dataflow.admin
) para crear el trabajo
Si inicias el trabajo desde una instancia de Compute Engine, puedes otorgar estos roles a una cuenta de servicio adjunta a la VM. Para obtener más información, consulta Crea una VM que use una cuenta de servicio administrada por el usuario.
También puedes usar las credenciales predeterminadas de la aplicación (ADC) con la suplantación de identidad de la cuenta de servicio cuando crees el trabajo.
- Cliente de Kafka administrado (
Configura la E/S administrada
Si tu canalización usa E/S administrada para Apache Kafka, establece las siguientes opciones de configuración para autenticarte con Managed Service para Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
En los siguientes ejemplos, se muestra cómo configurar la E/S administrada para Managed Service para Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configura el conector de KafkaIO
En los siguientes ejemplos, se muestra cómo configurar el conector KafkaIO
para Managed Service for Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
¿Qué sigue?
- Obtén más información sobre Managed Service para Apache Kafka.
- Escribe datos del servicio administrado para Apache Kafka en BigQuery.
- Lee desde Apache Kafka a Dataflow.
- Escribe desde Dataflow hasta Apache Kafka.