Die Vorlage „Apache Kafka für Cloud Storage“ ist eine Streamingpipeline, die Textdaten aus dem verwalteten Google Cloud-Dienst für Apache Kafka aufnimmt und die Datensätze in Cloud Storage ausgibt.
Sie können die Vorlage „Apache Kafka für BigQuery“ auch mit selbstverwalteter oder externer Kafka verwenden.
Pipelineanforderungen
- Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein.
- Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
- Die Apache Kafka-Themen müssen vorhanden sein.
Kafka-Nachrichtenformat
Die Vorlage „Apache Kafka für Cloud Storage“ unterstützt das Lesen von Nachrichten von Kafka in den folgenden Formaten: CONFLUENT_AVRO_WIRE_FORMAT
und JSON
.
Format der Ausgabedatei
Das Ausgabedateiformat hat dasselbe Format wie die Kafka-Eingabenachricht. Wenn Sie beispielsweise JSON für das Kafka-Nachrichtenformat auswählen, werden JSON-Dateien in den Cloud Storage-Ausgabe-Bucket geschrieben.
Authentifizierung
Die Vorlage „Apache Kafka für Cloud Storage“ unterstützt die SASL/PLAIN-Authentifizierung bei Kafka-Brokern.
Vorlagenparameter
Erforderliche Parameter
- readBootstrapServerAndTopic : Kafka-Thema, aus dem die Eingabe gelesen werden soll
- kafkaReadAuthenticationMode : Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie NONE für keine Authentifizierung und SASL_PLAIN für den SASL/PLAIN-Nutzernamen und das SASL_PLAIN-Passwort. Apache Kafka für BigQuery unterstützt nur den Authentifizierungsmodus SASL_PLAIN. Die Standardeinstellung ist SASL_PLAIN.
- outputDirectory: Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Muss mit einem Schrägstrich enden. (Beispiel: gs://your-bucket/your-path/).
- messageFormat : Das Format der zu lesenden Kafka-Nachrichten. Die unterstützten Werte sind AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry-codierter Avro), AVRO_BINARY_ENCODING (einfaches binäres Avro) und JSON. Die Standardeinstellung ist AVRO_CONFLUENT_WIRE_FORMAT.
Optionale Parameter
- windowDuration : Die Fensterdauer/Größe, in der Daten in Cloud Storage geschrieben werden. Zulässige Formate sind: Ns (für Sekunden, Beispiel: 5s), Nm (für Minuten, Beispiel: 12m), Nh (für Stunden, Beispiel: 2h). (Beispiel: 5 m). Die Standardeinstellung ist "5m".
- outputFilenamePrefix : Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. (Beispiel: output-). Die Standardeinstellung ist "output".
- numShards: Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Eine höhere Anzahl von Shards erhöht den Durchsatz für das Schreiben in Cloud Storage, aber möglicherweise auch höhere Kosten für die Datenaggregation über Shards bei der Verarbeitung von Cloud Storage-Ausgabedateien. Der Standardwert wird von Dataflow festgelegt.
- enableCommitOffsets : Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
- consumerGroupId : Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
- kafkaReadOffset : Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
- kafkaReadUsernameSecretId : Die Secret-ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- kafkaReadPasswordSecretId : Die geheime ID des Google Cloud Secret Managers, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. (Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>) Die Standardeinstellung ist leer.
- schemaFormat : Das Kafka-Schemaformat. Kann als SINGLE_SCHEMA_FILE oder SCHEMA_REGISTRY bereitgestellt werden. Wenn SINGLE_SCHEMA_FILE angegeben ist, sollten alle Nachrichten das Schema haben, das in der Avro-Schemadatei erwähnt wird. Wenn SCHEMA_REGISTRY angegeben ist, können die Nachrichten entweder ein einzelnes Schema oder mehrere Schemas haben. Die Standardeinstellung ist SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : Der Google Cloud Storage-Pfad zu der einzelnen Avro-Schemadatei, die zur Decodierung aller Nachrichten in einem Thema verwendet wird. Die Standardeinstellung ist leer.
- schemaRegistryConnectionUrl : Die URL für die Confluent Schema Registry-Instanz, die zum Verwalten von Avro-Schemas für die Nachrichtendecodierung verwendet wird. Die Standardeinstellung ist leer.
- binaryAvroSchemaPath : Der Google Cloud Storage-Pfad zur Avro-Datei, die zum Decodieren binärcodierter Avro-Nachrichten verwendet wird. Die Standardeinstellung ist leer.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to Cloud Storage templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
BIGQUERY_TABLE
: der Name Ihrer Cloud Storage-TabelleKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie untergcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.KAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, von der aus der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie untergcloud topic escaping
.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
BIGQUERY_TABLE
: der Name Ihrer Cloud Storage-TabelleKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie untergcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
Der Cloud Storage-URI der Datei.js
, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel:gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.KAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, von der aus der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie untergcloud topic escaping
.
Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in Cloud Storage schreiben.
Nächste Schritte
- Informationen zu Dataflow-Vorlagen.
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.