Vorlage „Apache Kafka zu Kafka“

Mit der Vorlage „Apache Kafka zu Apache Kafka“ wird eine Streamingpipeline erstellt, die Daten als Bytes aus einer Apache Kafka-Quelle aufnimmt und dann in eine Apache Kafka-Senke schreibt.

Pipelineanforderungen

  • Das Apache Kafka-Quellthema muss vorhanden sein.
  • Die Apache Kafka-Quell- und Ziel-Broker-Server müssen ausgeführt werden und von den Dataflow-Worker-Maschinen erreichbar sein.
  • Wenn Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel verwenden, muss das Thema vorhanden sein, bevor Sie die Vorlage starten.

Kafka-Nachrichtenformat

Die Apache Kafka-Quellnachrichten werden als Bytes gelesen und in die Apache Kafka-Senke geschrieben.

Authentifizierung

Die Vorlage „Apache Kafka zu Apache Kafka“ unterstützt die SASL/PLAIN- und die TLS-Authentifizierung bei Kafka-Brokern.

Vorlagenparameter

Erforderliche Parameter

  • readBootstrapServerAndTopic: Kafka-Bootstrap-Server und -Thema, aus dem die Eingabe gelesen werden soll. Beispiel: localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie KafkaAuthenticationMethod.NONE für keine Authentifizierung, KafkaAuthenticationMethod.SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und KafkaAuthenticationMethod.TLS für die zertifikatbasierte Authentifizierung. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS sollte nur für Google Cloud Apache Kafka for BigQuery-Cluster verwendet werden. Es ermöglicht die Authentifizierung mit Standardanmeldedaten für Anwendungen.
  • writeBootstrapServerAndTopic: Kafka-Thema, in das die Ausgabe geschrieben werden soll.
  • kafkaWriteAuthenticationMethod: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und TLS für die zertifikatbasierte Authentifizierung. Standardeinstellung: APPLICATION_DEFAULT_CREDENTIALS.

Optionale Parameter

  • enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
  • consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
  • kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
  • kafkaReadUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Kafka-Cluster enthält. Beispiel: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java Truststore-Datei (JKS) zur Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWritePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Die Standardeinstellung ist leer.
  • kafkaWriteKeystoreLocation: Der Google Cloud Storage-Pfad zur Java Keystore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Ziel-Kafka-Cluster enthält. Beispiel: gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Ziel-Kafka-Brokers geprüft werden soll.
  • kafkaWriteTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) zur TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java KeyStore-Datei (JKS) enthält, die für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java Keystore-Datei (JKS) für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: der Name Ihrer Cloud Storage-Tabelle
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: der Name Ihrer Cloud Storage-Tabelle
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in Cloud Storage schreiben.

Nächste Schritte