Cette page explique comment utiliser Google Cloud Managed Service pour Apache Kafka comme source ou récepteur dans un pipeline Dataflow.
Vous pouvez utiliser l'une des approches suivantes :
Conditions requises
Activez les API Cloud Storage, Dataflow et Managed Service for Apache Kafka dans votre projet. Consultez Activer des API ou exécutez la commande Google Cloud CLI suivante :
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Le compte de service de nœud de calcul Dataflow doit disposer du rôle IAM (Identity and Access Management) Client Kafka géré (
roles/managedkafka.client
).Les VM de nœud de calcul Dataflow doivent avoir un accès réseau au serveur d'amorçage Kafka. Pour en savoir plus, consultez Configurer la mise en réseau de Managed Service pour Apache Kafka.
Obtenir l'adresse du serveur d'amorçage
Pour exécuter un pipeline qui se connecte à un cluster Managed Service pour Apache Kafka, commencez par obtenir l'adresse du serveur d'amorçage du cluster. Vous aurez besoin de cette adresse lorsque vous configurerez le pipeline.
Vous pouvez utiliser la console Google Cloud ou Google Cloud CLI, comme suit :
Console
Dans la console Google Cloud , accédez à la page Clusters.
Cliquez sur le nom du cluster.
Cliquez sur l'onglet Configurations.
Copiez l'adresse du serveur d'amorçage à partir de l'URL d'amorçage.
gcloud
Exécutez la commande managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Remplacez les éléments suivants :
- CLUSTER_ID : ID ou nom du cluster
- LOCATION : emplacement du cluster
Pour en savoir plus, consultez Afficher un cluster Managed Service pour Apache Kafka.
Utiliser Managed Service pour Apache Kafka avec un modèle Dataflow
Google fournit plusieurs modèles Dataflow qui lisent les données à partir d'Apache Kafka :
Ces modèles peuvent être utilisés avec Managed Service pour Apache Kafka. Si l'un d'eux correspond à votre cas d'utilisation, envisagez de l'utiliser plutôt que d'écrire du code de pipeline personnalisé.
Console
Accédez à la page Dataflow > Tâches.
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Dans le champ Nom du job, saisissez un nom pour le job.
Dans le menu déroulant Modèle Dataflow, sélectionnez le modèle à exécuter.
Dans la zone Serveur d'amorçage Kafka, saisissez l'adresse du serveur d'amorçage.
Dans le champ Sujet Kafka, saisissez le nom du sujet.
Pour Kafka authentication mode (Mode d'authentification Kafka), sélectionnez APPLICATION_DEFAULT_CREDENTIALS.
Pour Format de message Kafka, sélectionnez le format des messages Apache Kafka.
Saisissez d'autres paramètres si nécessaire. Les paramètres compatibles sont documentés pour chaque modèle.
Exécuter le job
gcloud
Exécutez la commande gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Remplacez les éléments suivants :
- JOB_NAME : nom de la tâche.
- TEMPLATE_FILE : emplacement du fichier de modèle dans Cloud Storage
- REGION_NAME : région dans laquelle vous souhaitez déployer votre job
- PROJECT_NAME : nom de votre Google Cloud projet
- LOCATION : emplacement du cluster
- CLUSTER_ID : ID ou nom du cluster
- TOPIC : nom du sujet Kafka
Utiliser Managed Service pour Apache Kafka avec un pipeline Beam
Cette section explique comment utiliser le SDK Apache Beam pour créer et exécuter un pipeline Dataflow qui se connecte à Managed Service for Apache Kafka.
Dans la plupart des scénarios, utilisez la transformation d'E/S gérée comme source ou récepteur Kafka. Si vous avez besoin d'un réglage plus avancé des performances, envisagez d'utiliser le connecteur KafkaIO
.
Pour en savoir plus sur les avantages de l'utilisation des E/S gérées, consultez E/S gérées Dataflow.
Conditions requises
Client Kafka version 3.6.0 ou ultérieure.
SDK Apache Beam version 2.61.0 ou ultérieure.
La machine sur laquelle vous démarrez le job Dataflow doit avoir un accès réseau au serveur d'amorçage Apache Kafka. Par exemple, démarrez le job à partir d'une instance Compute Engine pouvant accéder au VPC où le cluster est accessible.
Le compte principal qui crée le job doit disposer des rôles IAM suivants :
- Client Managed Kafka (
roles/managedkafka.client
) pour accéder au cluster Apache Kafka. - Utilisateur du compte de service (
roles/iam.serviceAccountUser
) pour agir en tant que compte de service de nœud de calcul Dataflow. - Administrateur de l'espace de stockage (
roles/storage.admin
) pour importer des fichiers de job dans Cloud Storage. - Administrateur Dataflow (
roles/dataflow.admin
) pour créer le job.
Si vous démarrez le job à partir d'une instance Compute Engine, vous pouvez accorder ces rôles à un compte de service associé à la VM. Pour en savoir plus, consultez Créer une VM qui utilise un compte de service géré par l'utilisateur.
Vous pouvez également utiliser les Identifiants par défaut de l'application (ADC) avec l'emprunt d'identité d'un compte de service lorsque vous créez le job.
- Client Managed Kafka (
Configurer les E/S gérées
Si votre pipeline utilise les E/S gérées pour Apache Kafka, définissez les options de configuration suivantes pour vous authentifier auprès de Managed Service pour Apache Kafka :
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
Les exemples suivants montrent comment configurer les E/S gérées pour Managed Service pour Apache Kafka :
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Configurer le connecteur KafkaIO
Les exemples suivants montrent comment configurer le connecteur KafkaIO
pour Managed Service pour Apache Kafka :
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
Étapes suivantes
- En savoir plus sur Managed Service pour Apache Kafka
- Écrire des données du service géré pour Apache Kafka dans BigQuery
- Lire des données depuis Apache Kafka vers Dataflow
- Écrire depuis Dataflow vers Apache Kafka