Mit der Vorlage „Apache Kafka zu Apache Kafka“ wird eine Streamingpipeline erstellt, die Daten als Bytes aus einer Apache Kafka-Quelle aufnimmt und dann in eine Apache Kafka-Senke schreibt.
Pipelineanforderungen
- Das Apache Kafka-Quellthema muss vorhanden sein.
- Die Apache Kafka-Quell- und Ziel-Broker-Server müssen ausgeführt werden und von den Dataflow-Worker-Maschinen erreichbar sein.
- Wenn Sie Google Cloud Managed Service for Apache Kafka als Quelle oder Ziel verwenden, muss das Thema vorhanden sein, bevor Sie die Vorlage starten.
Kafka-Nachrichtenformat
Die Apache Kafka-Quellnachrichten werden als Bytes gelesen und in die Apache Kafka-Senke geschrieben.
Authentifizierung
Die Vorlage „Apache Kafka zu Apache Kafka“ unterstützt die SASL/PLAIN- und die TLS-Authentifizierung bei Kafka-Brokern.
Vorlagenparameter
Erforderliche Parameter
- readBootstrapServerAndTopic : Kafka-Thema, aus dem die Eingabe gelesen werden soll
- kafkaReadAuthenticationMode : Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und TLS für die zertifikatbasierte Authentifizierung. Apache Kafka für BigQuery unterstützt nur den Authentifizierungsmodus SASL_PLAIN. Die Standardeinstellung ist SASL_PLAIN.
- writeBootstrapServerAndTopic: Kafka-Thema, in das die Ausgabe geschrieben werden soll.
- kafkaWriteAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung, SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und TLS für die zertifikatbasierte Authentifizierung. Die Standardeinstellung ist: NONE.
Optionale Parameter
- enableCommitOffsets : Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
- consumerGroupId : Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
- kafkaReadOffset : Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
- kafkaReadUsernameSecretId : Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- kafkaReadPasswordSecretId : Die geheime ID des Google Cloud Secret Managers, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das zur Authentifizierung beim Kafka-Cluster zu verwendende TLS-Zertifikat und den privaten Schlüssel enthält. (Beispiel: gs://Ihr-Bucket/keystore.jks).
- kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
- kafkaReadTruststorePasswordSecretId: Die Secret-ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java Truststore-Datei (JKS) zur Kafka-TLS-Authentifizierung verwendet werden soll (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)
- kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)
- kafkaWriteUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster enthält. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- kafkaWritePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- kafkaWriteKeystoreLocation: Der Google Cloud Storage-Pfad zur Java Keystore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Ziel-Kafka-Cluster enthält. (Beispiel: gs://
- kafkaWriteTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Ziel-Kafka-Brokers geprüft werden soll.
- kafkaWriteTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java TrustStore-Datei (JKS) zur TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)
- kafkaWriteKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort für den Zugriff auf die Java KeyStore-Datei (JKS) enthält, die für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)
- kafkaWriteKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java Keystore-Datei (JKS) für die TLS-Authentifizierung beim Ziel-Kafka-Cluster verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
BIGQUERY_TABLE
: der Name Ihrer Cloud Storage-TabelleKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie untergcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.KAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie untergcloud topic escaping
.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
BIGQUERY_TABLE
: der Name Ihrer Cloud Storage-TabelleKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie untergcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.KAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, über die der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie untergcloud topic escaping
.
Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in Cloud Storage schreiben.
Nächste Schritte
- Informationen zu Dataflow-Vorlagen.
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.