Dataflow mit Managed Service for Apache Kafka verwenden

Auf dieser Seite wird beschrieben, wie Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel in einer Dataflow-Pipeline verwenden.

Sie haben folgende Möglichkeiten:

Voraussetzungen

  • Aktivieren Sie die Cloud Storage API, die Dataflow API und die Managed Service for Apache Kafka API in Ihrem Projekt. Weitere Informationen finden Sie unter APIs aktivieren. Alternativ können Sie den folgenden Google Cloud CLI-Befehl ausführen:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Das Worker-Dienstkonto von Dataflow muss die IAM-Rolle (Identity and Access Management) „Managed Kafka Client“ (roles/managedkafka.client) haben.

  • Die Dataflow-Worker-VMs müssen Netzwerkzugriff auf den Kafka-Bootstrap-Server haben. Weitere Informationen finden Sie unter Netzwerk für Managed Service for Apache Kafka konfigurieren.

Bootstrap-Serveradresse abrufen

Wenn Sie eine Pipeline ausführen möchten, die eine Verbindung zu einem Managed Service for Apache Kafka-Cluster herstellt, müssen Sie zuerst die Bootstrap-Serveradresse des Clusters abrufen. Sie benötigen diese Adresse, wenn Sie die Pipeline konfigurieren.

Sie können die Google Cloud -Konsole oder die Google Cloud CLI verwenden:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Clusternamen.

  3. Klicken Sie auf den Tab Konfigurationen.

  4. Kopieren Sie die Bootstrap-Serveradresse aus Bootstrap-URL.

gcloud

Führen Sie den Befehl managed-kafka clusters describe aus.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Ersetzen Sie Folgendes:

  • CLUSTER_ID: die ID oder der Name des Clusters
  • LOCATION: Der Standort des Clusters

Weitere Informationen finden Sie unter Managed Service for Apache Kafka-Cluster ansehen.

Managed Service for Apache Kafka mit einer Dataflow-Vorlage verwenden

Google bietet mehrere Dataflow-Vorlagen, die Daten aus Apache Kafka lesen:

Diese Vorlagen können mit Managed Service for Apache Kafka verwendet werden. Wenn einer davon zu Ihrem Anwendungsfall passt, sollten Sie ihn verwenden, anstatt benutzerdefinierten Pipelinecode zu schreiben.

Console

  1. Rufen Sie die Seite Dataflow > Jobs auf.

    ZU JOBS

  2. Klicken Sie auf Job aus Vorlage erstellen.

  3. Geben Sie unter Jobname einen Namen für den Job ein.

  4. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Vorlage aus, die Sie ausführen möchten.

  5. Geben Sie im Feld Kafka-Bootstrap-Server die Adresse des Bootstrap-Servers ein.

  6. Geben Sie im Feld Kafka-Thema den Namen des Themas ein.

  7. Wählen Sie für Kafka authentication mode (Kafka-Authentifizierungsmodus) die Option APPLICATION_DEFAULT_CREDENTIALS aus.

  8. Wählen Sie bei Kafka-Nachrichtenformat das Format der Apache Kafka-Nachrichten aus.

  9. Geben Sie nach Bedarf weitere Parameter ein. Die unterstützten Parameter sind für jede Vorlage dokumentiert.

  10. Job ausführen

gcloud

Führen Sie den Befehl gcloud dataflow jobs run aus.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Ersetzen Sie Folgendes:

  • JOB_NAME: Ein Name für den Job
  • TEMPLATE_FILE: Der Speicherort der Vorlagendatei in Cloud Storage.
  • REGION_NAME: die Region, in der Sie Ihren Job bereitstellen möchten
  • PROJECT_NAME: Name Ihres Google Cloud Projekts
  • LOCATION: Der Standort des Clusters
  • CLUSTER_ID: die ID oder der Name des Clusters
  • TOPIC: Der Name des Kafka-Themas

Managed Service for Apache Kafka mit einer Beam-Pipeline verwenden

In diesem Abschnitt wird beschrieben, wie Sie mit dem Apache Beam SDK eine Dataflow-Pipeline erstellen und ausführen, die eine Verbindung zu Managed Service for Apache Kafka herstellt.

Verwenden Sie in den meisten Fällen die verwaltete I/O-Transformation als Kafka-Quelle oder ‑Senke. Wenn Sie eine erweiterte Leistungsoptimierung benötigen, sollten Sie den KafkaIO-Connector verwenden. Weitere Informationen zu den Vorteilen der Verwendung von verwalteten E/A finden Sie unter Von Dataflow verwaltete E/A.

Voraussetzungen

  • Kafka-Client-Version 3.6.0 oder höher.

  • Apache Beam SDK Version 2.61.0 oder höher.

  • Die Maschine, auf der Sie den Dataflow-Job starten, muss Netzwerkzugriff auf den Apache Kafka-Bootstrap-Server haben. Starten Sie den Job beispielsweise von einer Compute Engine-Instanz aus, die auf die VPC zugreifen kann, in der der Cluster erreichbar ist.

  • Das Hauptkonto, das den Job erstellt, muss die folgenden IAM-Rollen haben:

    • Managed Kafka Client (roles/managedkafka.client) für den Zugriff auf den Apache Kafka-Cluster.
    • Dienstkontonutzer (roles/iam.serviceAccountUser), der als Dataflow-Worker-Dienstkonto fungiert.
    • Storage-Administrator (roles/storage.admin), um Jobdateien in Cloud Storage hochzuladen.
    • Dataflow-Administrator (roles/dataflow.admin) zum Erstellen des Jobs.

    Wenn Sie den Job über eine Compute Engine-Instanz starten, können Sie diese Rollen einem Dienstkonto zuweisen, das an die VM angehängt ist. Weitere Informationen finden Sie unter VM mit vom Nutzer verwalteten Dienstkonto erstellen.

    Sie können auch Standardanmeldedaten für Anwendungen (Application Default Credentials, ADC) mit Identitätsübernahme des Dienstkontos verwenden, wenn Sie den Job erstellen.

Verwaltete E/A konfigurieren

Wenn in Ihrer Pipeline Managed I/O for Apache Kafka verwendet wird, legen Sie die folgenden Konfigurationsoptionen fest, um sich bei Managed Service for Apache Kafka zu authentifizieren:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

In den folgenden Beispielen wird gezeigt, wie verwaltete E/A für Managed Service for Apache Kafka konfiguriert wird:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

KafkaIO-Connector konfigurieren

In den folgenden Beispielen wird gezeigt, wie Sie den KafkaIO-Connector für Managed Service for Apache Kafka konfigurieren:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

Nächste Schritte