Vorlage „Apache Kafka zu Kafka“

Mit der Vorlage „Apache Kafka zu Apache Kafka“ wird eine Streamingpipeline erstellt, die Daten als Bytes aus einer Apache Kafka-Quelle aufnimmt und dann in eine Apache Kafka-Senke schreibt.

Pipelineanforderungen

  • Das Apache Kafka-Quellthema muss vorhanden sein.
  • Die Apache Kafka-Quell- und Ziel-Broker-Server müssen ausgeführt werden und von den Dataflow-Worker-Maschinen erreichbar sein.
  • Wenn Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel verwenden, muss das Thema vorhanden sein, bevor Sie die Vorlage starten.

Kafka-Nachrichtenformat

Die Apache Kafka-Quellnachrichten werden als Bytes gelesen und in die Apache Kafka-Senke geschrieben.

Authentifizierung

Die Vorlage „Apache Kafka zu Apache Kafka“ unterstützt die SASL/PLAIN- und die TLS-Authentifizierung bei Kafka-Brokern.

Vorlagenparameter

Erforderliche Parameter

  • readBootstrapServerAndTopic: Kafka-Bootstrap-Server und -Thema, aus dem die Eingabe gelesen werden soll. Beispiel: localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie KafkaAuthenticationMethod.NONE für keine Authentifizierung, KafkaAuthenticationMethod.SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und KafkaAuthenticationMethod.TLS für die zertifikatbasierte Authentifizierung. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS sollte nur für Google Cloud Apache Kafka for BigQuery-Cluster verwendet werden. Es ermöglicht die Authentifizierung mit Standardanmeldedaten für Anwendungen.
  • writeBootstrapServerAndTopic: Kafka-Thema, in das die Ausgabe geschrieben werden soll.
  • kafkaWriteAuthenticationMethod: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und TLS für die zertifikatbasierte Authentifizierung. Standardeinstellung: APPLICATION_DEFAULT_CREDENTIALS.

Optionale Parameter

  • enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
  • consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
  • kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
  • kafkaReadUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Kafka-Cluster enthält. Beispiel: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java Truststore-Datei (JKS) zur Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWritePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWriteKeystoreLocation: Der Google Cloud Storage-Pfad zur Java Keystore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Ziel-Kafka-Brokers geprüft werden soll.
  • kafkaWriteTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) zur TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java KeyStore-Datei (JKS) enthält, die für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java Keystore-Datei (JKS) für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: der Name Ihrer Cloud Storage-Tabelle
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: der Name Ihrer Cloud Storage-Tabelle
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in Cloud Storage schreiben.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.kafka.options.KafkaWriteOptions;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.FileAwareProducerFactoryFn;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaAuthenticationMethod;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.ByteArraySerializer;

@Template(
    name = "Kafka_to_Kafka",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to Kafka",
    description = "A pipeline that writes data to a kafka destination from another kafka source",
    optionsClass = KafkaToKafka.KafkaToKafkaOptions.class,
    flexContainerName = "kafka-to-kafka",
    contactInformation = "https://cloud.google.com/support")
public class KafkaToKafka {
  public interface KafkaToKafkaOptions
      extends PipelineOptions, KafkaReadOptions, KafkaWriteOptions, DataflowPipelineOptions {}

  public static void main(String[] args) throws IOException {
    UncaughtExceptionLogger.register();
    KafkaToKafkaOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToKafkaOptions.class);
    run(options);
  }

  public static PipelineResult run(KafkaToKafkaOptions options) {

    if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaReadUsernameSecretId().trim().length() > 0,
          "KafkaReadUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaReadPasswordSecretId().trim().length() > 0,
          "KafkaReadPasswordSecretId required to access password for source kafka");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaReadTruststoreLocation().trim().length() > 0,
          "KafkaReadTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaReadTruststorePasswordSecretId().trim().length() > 0,
          "KafkaReadTruststorePassword for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaReadKeystoreLocation().trim().length() > 0,
          "KafkaReadKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaReadKeystorePasswordSecretId().trim().length() > 0,
          "KafkaReadKeystorePassword for key store password required to access key store");
      checkArgument(
          options.getKafkaReadKeyPasswordSecretId().trim().length() > 0,
          "KafkaReadKeyPasswordSecretId version for key password required for SSL authentication");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.NONE)
        || (options
            .getKafkaReadAuthenticationMode()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS))) {
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaReadAuthenticationMode());
    }

    if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaWriteUsernameSecretId().trim().length() > 0,
          "KafkaWriteUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaWritePasswordSecretId().trim().length() > 0,
          "KafkaWritePasswordSecretId required to access password for destination Kafka");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaWriteTruststoreLocation().trim().length() > 0,
          "KafkaWriteTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaWriteTruststorePasswordSecretId().trim().length() > 0,
          "KafkaWriteTruststorePasswordSecretId for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaWriteKeystoreLocation().trim().length() > 0,
          "KafkaWriteKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaWriteKeystorePasswordSecretId().trim().length() > 0,
          "KafkaWriteKeystorePasswordSecretId for key store password required to access key store");
      checkArgument(
          options.getKafkaWriteKeyPasswordSecretId().trim().length() > 0,
          "KafkaWriteKeyPasswordSecretId for source key password secret id version required for SSL authentication");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.NONE)
        || options
            .getKafkaWriteAuthenticationMethod()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
      // No additional validation is required for these auth mechanisms since they don't depend on
      // any specific pipeline options.
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaWriteAuthenticationMethod());
    }

    String sourceTopic;
    String sourceBootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> sourceBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      sourceTopic = sourceBootstrapServerAndTopicList.get(1);
      sourceBootstrapServers = sourceBootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    String destinationTopic;
    String destinationBootstrapServers;
    if (options.getWriteBootstrapServerAndTopic() != null) {
      List<String> destinationBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getWriteBootstrapServerAndTopic(), options.getProject());
      destinationBootstrapServers = destinationBootstrapServerAndTopicList.get(0);
      destinationTopic = destinationBootstrapServerAndTopicList.get(1);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
            "Read from Kafka",
            KafkaTransform.readBytesFromKafka(
                    sourceBootstrapServers,
                    Collections.singletonList(sourceTopic),
                    KafkaConfig.fromReadOptions(options),
                    options.getEnableCommitOffsets())
                .withoutMetadata())
        .apply(
            "Write to Kafka",
            KafkaIO.<byte[], byte[]>write()
                .withBootstrapServers(destinationBootstrapServers)
                .withTopic(destinationTopic)
                .withKeySerializer(ByteArraySerializer.class)
                .withValueSerializer(ByteArraySerializer.class)
                .withProducerConfigUpdates(KafkaConfig.fromWriteOptions(options))
                .withProducerFactoryFn(new FileAwareProducerFactoryFn()));

    return pipeline.run();
  }
}

Nächste Schritte