Vorlage „Apache Kafka für BigQuery“

Die Vorlage „Apache Kafka für BigQuery“ ist eine Streamingpipeline, die Textdaten aus Apache Kafka schreibt, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und die resultierenden Datensätze in BigQuery ausgibt. Alle Fehler, die bei der Transformation der Daten, der Ausführung der UDF oder beim Schreiben in die Ausgabetabelle auftreten, werden in eine separate Fehlertabelle in BigQuery geschrieben. Wenn die Fehlertabelle vor der Ausführung nicht vorhanden ist, wird sie erstellt.

Pipelineanforderungen

  • Die BigQuery-Ausgabetabelle muss vorhanden sein.
  • Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
  • Die Apache Kafka-Themen müssen vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.

Vorlagenparameter

Erforderliche Parameter

  • outputTableSpec : Der Speicherort der BigQuery-Tabelle, in die die Ausgabe geschrieben werden soll. Der Name muss das Format <project>:<dataset>.<table_name> haben. Das Schema der Tabelle muss mit Eingabeobjekten übereinstimmen.

Optionale Parameter

  • bootstrapServers: Kafka Bootstrap Server-Liste, durch Kommas getrennt. (Beispiel: localhost:9092,127.0.0.1:9093).
  • inputTopics: Kafka-Thema/Themen aus denen die Eingabe gelesen werden soll. (Beispiel: topic1,topic2).
  • outputDeadletterTable : BigQuery-Tabelle für fehlgeschlagene Nachrichten. Nachrichten, die die Ausgabetabelle aus verschiedenen Gründen nicht erreicht haben (z.B. nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Datei), werden in diese Tabelle geschrieben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen "outputTableSpec_error_records" verwendet. (Beispiel: your-project-id:your-dataset.your-table-name).
  • messageFormat: Das Nachrichtenformat. Kann AVRO oder JSON sein. Die Standardeinstellung ist JSON.
  • avroSchemaPath: Cloud Storage-Pfad zur Avro-Schemadatei. Beispiel: gs://MyBucket/file.avsc.
  • useStorageWriteApiAtLeastOnce : Dieser Parameter wird nur wirksam, wenn "BigQuery Storage Write API verwenden" aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false".
  • readBootstrapServers: Kafka-Boottrap-Serverliste, durch Kommas getrennt. (Beispiel: localhost:9092,127.0.0.1:9093).
  • kafkaReadTopics: Kafka-Themen, aus denen Eingaben gelesen werden sollen. (Beispiel: topic1,topic2).
  • javascriptTextTransformGcsPath : Das Cloud Storage-Pfadmuster für den JavaScript-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel: gs://Ihr-Bucket/Ihre-Funktion.js.
  • javascriptTextTransformFunctionName : Der Name der Funktion, die aus Ihrer JavaScript-Datei aufgerufen werden soll. Verwenden Sie nur Buchstaben, Ziffern und Unterstriche. (Beispiel: "transform" oder "transform_udf1").
  • javascriptTextTransformReloadIntervalMinutes : Definieren Sie das Intervall, in dem die Worker möglicherweise nach JavaScript-UDF-Änderungen suchen, um die Dateien neu zu laden. Die Standardeinstellung ist 0.
  • writeDisposition: BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Die Standardeinstellung ist WRITE_APPEND.
  • createDisposition : BigQuery CreateDisposition. Beispiel: CREATE_IF_NEEDED, CREATE_NEVER. Die Standardeinstellung ist CREATE_IF_NEEDED.
  • useStorageWriteApi: Wenn „true“, verwendet die Pipeline beim Schreiben der Daten in BigQuery die Storage Write API (siehe https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Der Standardwert ist „false“. Wenn Sie die Storage Write API im genau einmaligen Modus verwenden, müssen Sie die folgenden Parameter festlegen: „Anzahl der Streams für die BigQuery Storage Write API“ und „Triggerhäufigkeit in Sekunden für die BigQuery Storage Write API“. Wenn Sie den Dataflow-Modus „Mindestens einmal“ aktivieren oder den Parameter „useStorageWriteApiAtLeastOnce“ auf „true“ setzen, müssen Sie die Anzahl der Streams oder die Triggerhäufigkeit nicht festlegen.
  • numStorageWriteApiStreams: Die Anzahl der Streams definiert die Parallelität der Write-Transformation von BigQueryIO und entspricht ungefähr der Anzahl der Streams der Storage Write API, die von der Pipeline verwendet werden. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Die Triggerhäufigkeit legt fest, wie schnell die Daten für Abfragen in BigQuery sichtbar sind. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Der Kafka-Eintragswert, serialisiert als JSON-String.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.

Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in BigQuery schreiben.

Nächste Schritte