Vorlage „Apache Kafka für BigQuery“

Die Vorlage „Apache Kafka für BigQuery“ ist eine Streamingpipeline, die Textdaten aus Google Cloud Managed Service for Apache Kafka-Clustern aufnimmt und die resultierenden Datensätze dann in BigQuery-Tabellen ausgibt. Alle Fehler, die beim Einfügen von Daten in die Ausgabetabelle auftreten, werden in eine separate Fehlertabelle in BigQuery geschrieben.

Sie können die Vorlage „Apache Kafka für BigQuery“ auch mit selbstverwalteter oder externer Kafka verwenden.

Pipelineanforderungen

  • Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
  • Die Apache Kafka-Themen müssen vorhanden sein.
  • Sie müssen die Dataflow, BigQuery und Cloud Storage APIs aktivieren. Wenn eine Authentifizierung erforderlich ist, müssen Sie auch die Secret Manager API aktivieren.
  • Erstellen Sie ein BigQuery-Dataset und eine Tabelle mit dem entsprechenden Schema für Ihr Kafka-Eingabethema. Wenn Sie mehrere Schemas für dasselbe Thema verwenden und in mehrere Tabellen schreiben möchten, müssen Sie die Tabelle vor dem Konfigurieren der Pipeline nicht erstellen.
  • Wenn die Warteschlange für unzustellbare Nachrichten (unverarbeitete Nachrichten) für die Vorlage aktiviert ist, erstellen Sie eine leere Tabelle, die kein Schema für die Warteschlange für unzustellbare Nachrichten hat.

Kafka-Nachrichtenformat

Die Vorlage „Apache Kafka für BigQuery“ unterstützt das Lesen von Nachrichten aus Kafka in den folgenden Formaten: CONFLUENT_AVRO_WIRE_FORMAT ,AVRO_BINARY_FORMAT undJSON

Authentifizierung

Die Vorlage „Apache Kafka für BigQuery“ unterstützt die SASL/PLAIN-Authentifizierung bei Kafka-Brokern.

Vorlagenparameter

Erforderliche Parameter

  • readBootstrapServerAndTopic: Kafka-Thema, aus dem die Eingabe gelesen werden soll.
  • writeMode: Datensätze werden in eine oder mehrere Tabellen (basierend auf Schema) geschrieben. Der Modus DYNAMIC_TABLE_NAMES wird nur für das AVRO_CONFLUENT_WIRE_FORMAT-Quellnachrichtenformat und die Schemaquelle SCHEMA_REGISTRY unterstützt. Der Name der Zieltabelle wird anhand des Avro-Schemanamens der einzelnen Nachrichten automatisch generiert. Dabei kann es sich entweder um ein einzelnes Schema (Erstellen einer einzelnen Tabelle) oder um mehrere Schemas (Erstellen mehrerer Tabellen) handeln. Der Modus SINGLE_TABLE_NAME schreibt in eine einzelne Tabelle (einzelnes Schema), die vom Nutzer angegeben wird. Die Standardeinstellung ist SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: Der Authentifizierungsmodus zur Verwendung mit dem Kafka-Cluster. Verwenden Sie KafkaAuthenticationMethod.NONE für keine Authentifizierung, KafkaAuthenticationMethod.SASL_PLAIN für SASL/PLAIN-Nutzernamen und -Passwörter und KafkaAuthenticationMethod.TLS für die zertifikatbasierte Authentifizierung. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS sollte nur für Google Cloud Apache Kafka for BigQuery-Cluster verwendet werden. Es ermöglicht die Authentifizierung mit Standardanmeldedaten für Anwendungen.
  • messageFormat: Das Format der zu lesenden Kafka-Nachrichten. Die unterstützten Werte sind AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry-codierter Avro), AVRO_BINARY_ENCODING (einfaches binäres Avro) und JSON. Die Standardeinstellung ist AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Wenn dieser Wert „wahr“ ist, werden fehlgeschlagene Meldungen mit zusätzlichen Fehlerinformationen in BigQuery geschrieben. Die Standardeinstellung ist "false".

Optionale Parameter

  • outputTableSpec: Der Speicherort der BigQuery-Tabelle, in die die Ausgabe geschrieben werden soll. Der Name muss das Format <project>:<dataset>.<table_name> haben. Das Schema der Tabelle muss mit Eingabeobjekten übereinstimmen.
  • persistKafkaKey: Wenn dieser Wert „wahr“ ist, speichert die Pipeline den Kafka-Nachrichtenschlüssel in der BigQuery-Tabelle in einem _key-Feld vom Typ BYTES. Der Standardwert ist false (Schlüssel wird ignoriert).
  • outputProject: BigQuery-Ausgabeprojekt, in dem sich das Dataset befindet. Tabellen werden im Dataset dynamisch erstellt. Die Standardeinstellung ist leer.
  • outputDataset: BigQuery-Ausgabe-Dataset, in das die Ausgabe geschrieben werden soll. Tabellen werden im Dataset dynamisch erstellt. Wenn die Tabellen zuvor erstellt wurden, sollten die Tabellennamen der angegebenen Namenskonvention folgen. Der Name sollte bqTableNamePrefix + Avro Schema FullName sein , die Wörter sind durch einen Bindestrich - getrennt. Die Standardeinstellung ist leer.
  • bqTableNamePrefix: Namenspräfix, das beim Erstellen von BigQuery-Ausgabetabellen verwendet werden soll. Gilt nur, wenn die Schema-Registry verwendet wird. Die Standardeinstellung ist leer.
  • createDisposition: BigQuery CreateDisposition. Beispiel: CREATE_IF_NEEDED, CREATE_NEVER. Die Standardeinstellung ist CREATE_IF_NEEDED.
  • writeDisposition: BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Die Standardeinstellung ist WRITE_APPEND.
  • useAutoSharding: Wenn „wahr“ (true), verwendet die Pipeline beim Schreiben in BigQuery eine automatische Fragmentierung. Der Standardwert ist true.
  • numStorageWriteApiStreams: Gibt die Anzahl der Schreibstreams an. Dieser Parameter muss festgelegt werden. Der Standardwert ist 0.
  • storageWriteApiTriggeringFrequencySec: Gibt die Auslösehäufigkeit in Sekunden an. Dieser Parameter muss festgelegt werden. Standardwert: 5 Sekunden.
  • useStorageWriteApiAtLeastOnce: Dieser Parameter wird nur wirksam, wenn „BigQuery Storage Write API verwenden“ aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false".
  • enableCommitOffsets: Commit-Offsets verarbeiteter Nachrichten an Kafka. Wenn diese Option aktiviert ist, werden dadurch die Lücken oder doppelte Verarbeitung von Nachrichten beim Neustart der Pipeline minimiert. Angabe der Nutzergruppen-ID erforderlich. Die Standardeinstellung ist "false".
  • consumerGroupId: Die eindeutige Kennung für die Nutzergruppe, zu der diese Pipeline gehört. Erforderlich, wenn Commit-Offsets für Kafka aktiviert sind. Die Standardeinstellung ist leer.
  • kafkaReadOffset: Der Ausgangspunkt für das Lesen von Nachrichten, wenn keine festgeschriebenen Offsets vorhanden sind. Die früheste beginnt am Anfang, die neueste aus der neuesten Nachricht. Die Standardeinstellung ist "latest".
  • kafkaReadUsernameSecretId: Die geheime ID von Google Cloud Secret Manager, die den Kafka-Nutzernamen enthält, der für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Kafka-Passwort enthält, das für die SASL_PLAIN-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> Die Standardeinstellung ist leer.
  • kafkaReadKeystoreLocation: Der Google Cloud Storage-Pfad zur Java KeyStore-Datei (JKS), die das TLS-Zertifikat und den privaten Schlüssel für die Authentifizierung beim Kafka-Cluster enthält. Beispiel: gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Der Google Cloud Storage-Pfad zur Java TrustStore-Datei (JKS), die die vertrauenswürdigen Zertifikate enthält, mit denen die Identität des Kafka-Brokers geprüft werden soll.
  • kafkaReadTruststorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java Truststore-Datei (JKS) zur Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf die Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: Die geheime ID von Google Cloud Secret Manager, die das Passwort enthält, das für den Zugriff auf den privaten Schlüssel in der Java KeyStore-Datei (JKS) für die Kafka-TLS-Authentifizierung verwendet werden soll. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: Das Kafka-Schemaformat. Kann als SINGLE_SCHEMA_FILE oder SCHEMA_REGISTRY angegeben werden. Wenn SINGLE_SCHEMA_FILE angegeben ist, wird für alle Nachrichten das in der Avro-Schemadatei erwähnte Schema verwendet. Wenn SCHEMA_REGISTRY angegeben ist, können die Nachrichten entweder ein einzelnes Schema oder mehrere Schemas haben. Die Standardeinstellung ist SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: Der Google Cloud Storage-Pfad zur einzelnen Avro-Schemadatei, die zur Decodierung aller Nachrichten in einem Thema verwendet wird. Die Standardeinstellung ist leer.
  • schemaRegistryConnectionUrl: Die URL für die Confluent Schema Registry-Instanz, die zum Verwalten von Avro-Schemas für die Nachrichtendecodierung verwendet wird. Die Standardeinstellung ist leer.
  • binaryAvroSchemaPath: Der Google Cloud Storage-Pfad zur Avro-Schemadatei, die zum Decodieren binärcodierter Avro-Nachrichten verwendet wird. Die Standardeinstellung ist leer.
  • schemaRegistryAuthenticationMode: Authentifizierungsmodus der Schema Registry. Kann NONE, TLS oder OAUTH sein. Die Standardeinstellung ist: NONE.
  • schemaRegistryTruststoreLocation: Speicherort des SSL-Zertifikats, in dem der Trust Store für die Authentifizierung bei der Schema Registry gespeichert ist. Beispiel: /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: Secret-ID in Secret Manager, unter der das Passwort für den Zugriff auf das Secret im Truststore gespeichert ist. Beispiel: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: Speicherort des Keystores, der das SSL-Zertifikat und den privaten Schlüssel enthält. Beispiel: /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: Secret-ID im Secret Manager, unter der sich das Passwort für den Zugriff auf die Schlüsselspeicherdatei befindet, z. B. projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: Secret-ID des Passworts, das für den Zugriff auf den privaten Schlüssel des Clients erforderlich ist, der im Schlüsselspeicher gespeichert ist. Beispiel: projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: Client-ID, die zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet wird. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: Die geheime ID von Google Cloud Secret Manager, die das Client Secret enthält, das für die Authentifizierung des Schema Registry-Clients im OAUTH-Modus verwendet werden soll. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT. Beispiel: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: Der Zugriffstoken-Umfang, der für die Authentifizierung des Schema Registry-Clients im OAUTH-Modus verwendet wird. Dieses Feld ist optional, da die Anfrage auch ohne übergebenen Umfangsparameter gestellt werden kann. Beispiel: openid.
  • schemaRegistryOauthTokenEndpointUrl: Die HTTP(S)-basierte URL für den OAuth-/OIDC-Identitätsanbieter, die zum Authentifizieren des Schema Registry-Clients im OAUTH-Modus verwendet wird. Erforderlich für das Nachrichtenformat AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: Vollständig qualifizierter BigQuery-Tabellenname für fehlgeschlagene Nachrichten. Nachrichten, die die Ausgabetabelle aus verschiedenen Gründen nicht erreicht haben (z.B. nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Datei), werden in diese Tabelle geschrieben.Die Tabelle wird von der Vorlage erstellt. Beispiel: your-project-id:your-dataset.your-table-name.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert 0 ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist 0.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen angegeben sind, müssen Sie Kommas maskieren. Weitere Informationen finden Sie unter gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse benötigt die Portnummer, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, müssen Sie Kommas mit Escapezeichen versehen. Weitere Informationen finden Sie unter gcloud topic escaping.

Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in BigQuery schreiben.

Nächste Schritte