Mit Dataflow Daten von Kafka in BigQuery schreiben

Dieses Dokument bietet eine allgemeine Anleitung zum Erstellen und Bereitstellen einer Dataflow-Pipeline, die von Apache Kafka zu BigQuery streamt.

Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Kafka wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Sie können mit Dataflow Ereignisse aus Kafka lesen, verarbeiten und die Ergebnisse zur weiteren Analyse in eine BigQuery-Tabelle schreiben.

Kafka-Ereignisse in BigQuery lesen

Google stellt eine Dataflow-Vorlage zur Verfügung, die eine Kafka-zu-BigQuery-Pipeline konfiguriert. Die Vorlage verwendet den BigQueryIO-Connector, der im Apache Beam SDK enthalten ist.

Führen Sie die folgenden Schritte aus, um diese Vorlage zu verwenden:

  1. Stellen Sie Kafka entweder in Google Cloud oder an einem anderen Ort bereit.
  2. Netzwerk konfigurieren.
  3. IAM-Berechtigungen (Identity and Access Management) festlegen.
  4. Funktion schreiben, um die Ereignisdaten zu transformieren
  5. Erstellen Sie die BigQuery-Ausgabetabelle.
  6. Stellen Sie die Dataflow-Vorlage bereit.

Kafka bereitstellen

In Google Cloud können Sie einen Kafka-Cluster auf Compute Engine-VM-Instanzen bereitstellen oder einen von einem Drittanbieter verwalteten Kafka-Dienst verwenden. Weitere Informationen zu den Bereitstellungsoptionen in Google Cloud finden Sie unter Was ist Apache Kafka?. Kafka-Lösungen von Drittanbietern finden Sie im Google Cloud Marketplace.

Alternativ haben Sie möglicherweise einen vorhandenen Kafka-Cluster, der sich außerhalb von Google Cloud befindet. Beispiel: Sie haben eine vorhandene Arbeitslast, die lokal oder in einer anderen öffentlichen Cloud bereitgestellt wird.

Netzwerk konfigurieren

Normalerweise startet Dataflow Instanzen in Ihrem VPC-Standardnetzwerk (Virtual Private Cloud). Je nach Kafka-Konfiguration müssen Sie möglicherweise ein anderes Netzwerk und Subnetz für Dataflow konfigurieren. Weitere Informationen finden Sie unter Netzwerk und Subnetzwerk angeben. Erstellen Sie beim Konfigurieren Ihres Netzwerks Firewallregeln, mit denen die Dataflow-Worker-Maschinen die Kafka-Broker erreichen können.

Wenn Sie VPC Service Controls verwenden, platzieren Sie den Kafka-Cluster innerhalb des VPC Service Controls-Perimeters oder weiten Sie die Perimeter auf das autorisierte VPN oder Cloud Interconnect aus.

Wenn Ihr Kafka-Cluster außerhalb von Google Cloud bereitgestellt wird, müssen Sie eine Netzwerkverbindung zwischen Dataflow und dem Kafka-Cluster erstellen. Es gibt mehrere Netzwerkoptionen mit unterschiedlichen Vor- und Nachteilen:

Dedicated Interconnect ist die beste Option für eine vorhersagbare Leistung und Zuverlässigkeit. Die Einrichtung kann jedoch länger dauern, da die neuen Verbindungen durch Dritte bereitgestellt werden müssen. Mit einer Topologie auf Basis einer öffentlichen IP-Adresse können Sie schnell starten, da nur wenig Netzwerkarbeit erforderlich ist.

In den nächsten beiden Abschnitten werden diese Optionen ausführlicher beschrieben.

Freigegebener RFC 1918-Adressraum

Sowohl Dedicated Interconnect als auch IPsec-VPN bieten Ihnen direkten Zugriff auf RFC 1918-IP-Adressen in Ihrer Virtual Private Cloud (VPC), was Ihre Kafka-Konfiguration vereinfachen kann. Wenn Sie eine VPN-basierte Topologie verwenden, sollten Sie ein VPN mit hohem Durchsatz erstellen.

Normalerweise startet Dataflow Instanzen in Ihrem standardmäßigen VPC-Netzwerk. In einer privaten Netzwerktopologie mit explizit in Cloud Router definierten Routen, die Subnetzwerke in Google Cloud mit diesem Kafka-Cluster verbinden, müssen Sie die Standorte Ihrer Dataflow-Instanzen besser steuern können. Sie können mit Dataflow die Ausführungsparameter network und subnetwork konfigurieren.

Achten Sie darauf, dass das entsprechende Subnetzwerk über genügend IP-Adressen verfügt, damit Dataflow Instanzen beim horizontalen Skalieren starten kann. Wenn Sie zum Starten der Dataflow-Instanzen ein separates Netzwerk erstellen, ist außerdem wichtig, dass Sie eine Firewallregel haben, die TCP-Traffic zwischen allen virtuellen Maschinen im Projekt zulässt. Im Standardnetzwerk ist diese Firewallregel bereits konfiguriert.

Öffentlicher IP-Adressraum

Diese Architektur verwendet zum Schutz des Traffics zwischen externen Clients und Kafka Transport Layer Security (TLS). Die Kommunikation zwischen den Brokern erfolgt in Klartext. Wenn sich der Kafka-Listener an eine Netzwerkschnittstelle bindet, die sowohl für die interne als auch für die externe Kommunikation verwendet wird, ist die Konfiguration des Listeners unkompliziert. In vielen Szenarien unterscheiden sich die extern veröffentlichten Adressen der Kafka-Broker im Cluster jedoch von den internen Netzwerkschnittstellen, die Kafka verwendet. In solchen Szenarien können Sie das Attribut advertised.listeners verwenden:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Externe Clients verbinden sich über Port 9093 über einen „SSL“-Kanal her und interne Clients verbinden sich über einen Klartextkanal über Port 9092. Wenn Sie unter advertised.listeners eine Adresse angeben, verwenden Sie DNS-Namen (in diesem Beispiel kafkabroker-n.mydomain.com), die für externen und internen Traffic in dieselbe Instanz aufgelöst werden. Öffentliche IP-Adressen funktionieren eventuell nicht, da die Adressen möglicherweise für den internen Traffic nicht richtig aufgelöst werden.

IAM-Berechtigungen festlegen

Dataflow-Jobs verwenden zwei IAM-Dienstkonten:

  • Der Dataflow-Dienst verwendet ein Dataflow-Dienstkonto, um Google Cloud-Ressourcen wie VMs zu bearbeiten.
  • Die Dataflow-Worker-VMs verwenden ein Worker-Dienstkonto, um auf die Dateien und andere Ressourcen Ihrer Pipeline zuzugreifen. Dieses Dienstkonto benötigt Schreibzugriff auf die BigQuery-Ausgabetabelle. Außerdem benötigt er Zugriff auf alle anderen Ressourcen, auf die der Pipelinejob verweist.

Sorgen Sie dafür, dass diese beiden Dienstkonten die entsprechenden Rollen haben. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

Daten für BigQuery umwandeln

Die Vorlage " Kafka-zu-BigQuery" erstellt eine Pipeline, die Ereignisse aus einem oder mehreren Kafka-Themen liest und in eine BigQuery-Tabelle schreibt. Optional können Sie eine benutzerdefinierte JavaScript-Funktion bereitstellen, die die Ereignisdaten transformiert, bevor sie in BigQuery geschrieben werden.

Die Ausgabe der Pipeline muss JSON-formatierte Daten sein, die mit dem Schema der Ausgabetabelle übereinstimmen. Wenn die Kafka-Ereignisdaten bereits im JSON-Format vorliegen, können Sie eine BigQuery-Tabelle mit einem übereinstimmenden Schema erstellen und die Ereignisse direkt an BigQuery übergeben. Andernfalls erstellen Sie eine UDF, die die Ereignisdaten als Eingabe verwendet und JSON-Daten zurückgibt, die mit Ihrer BigQuery-Tabelle übereinstimmen.

Angenommen, die Ereignisdaten enthalten zwei Felder:

  • name (string)
  • customer_id (integer)

Die Ausgabe der Dataflow-Pipeline könnte so aussehen:

{ "name": "Alice", "customer_id": 1234 }

Wenn die Ereignisdaten nicht bereits im JSON-Format vorliegen, würden Sie eine UDF schreiben, die die Daten so transformiert:

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

Die UDF kann eine zusätzliche Verarbeitung der Ereignisdaten ausführen, z. B. Ereignisse filtern, personenbezogene Daten entfernen oder die Daten mit zusätzlichen Feldern anreichern.

Weitere Informationen zum Schreiben einer UDF für die Vorlage finden Sie unter Dataflow-Vorlage mit UDFs erweitern. JavaScript-Datei in Cloud Storage hochladen.

BigQuery-Ausgabetabelle erstellen

Erstellen Sie die BigQuery-Ausgabetabelle, bevor Sie die Vorlage ausführen. Das Tabellenschema muss mit der JSON-Ausgabe der Pipeline kompatibel sein. Für jedes Attribut in der JSON-Nutzlast schreibt die Pipeline den Wert in die BigQuery-Tabellenspalte desselben Namens. Alle fehlenden Attribute im JSON-Format werden als NULL-Werte interpretiert.

Im vorherigen Beispiel hätte die BigQuery-Tabelle die folgenden Spalten:

Spaltenname Datentyp
name STRING
customer_id INTEGER

Sie können die Tabelle mit der SQL-Anweisung CREATE TABLE erstellen:

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Alternativ können Sie das Tabellenschema mithilfe einer JSON-Definitionsdatei angeben. Weitere Informationen finden Sie in der BigQuery-Dokumentation unter Schema angeben.

Führen Sie den Dataflow-Job aus:

Führen Sie nach dem Erstellen der BigQuery-Tabelle die Dataflow-Vorlage aus.

Console

Führen Sie die folgenden Schritte aus, um den Dataflow-Job mithilfe der Google Cloud Console zu erstellen:

  1. Rufen Sie in der Google Cloud Console die Seite "Dataflow" auf.
  2. Klicken Sie auf Job aus Vorlage erstellen.
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Wählen Sie unter Regionaler Endpunkt eine Region aus.
  5. Wählen Sie die Vorlage "Kafka zu BigQuery" aus.
  6. Geben Sie unter Erforderliche Parameter den Namen der BigQuery-Ausgabetabelle ein. Die Tabelle muss bereits vorhanden sein und ein gültiges Schema haben.
  7. Klicken Sie auf Optionale Parameter anzeigen und geben Sie Werte für mindestens die folgenden Parameter ein:

    • Das Kafka-Thema, aus dem die Eingabe gelesen werden soll.
    • Die Liste der Kafka-Bootstrap-Server, durch Kommas getrennt.
    • Die E-Mail-Adresse eines Dienstkontos.

    Geben Sie nach Bedarf zusätzliche Parameter ein. Insbesondere muss Folgendes angegeben werden:

gcloud

Führen Sie den folgenden Befehl aus, um den Dataflow-Job mithilfe der Google Cloud-Befehlszeile zu erstellen:

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Ersetzen Sie die folgenden Variablen:

  • JOB_NAME: Ein Jobname Ihrer Wahl.
  • LOCATION: Die Region, in der der Job ausgeführt werden soll. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.
  • KAFKA_TOPICS Eine durch Kommas getrennte Liste von zu lesenden Kafka-Themen.
  • BOOTSTRAP_SERVERS Eine durch Kommas getrennte Liste von Kafka-Bootstrap-Servern. Beispiel: 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE Die BigQuery-Ausgabetabelle, angegeben als PROJECT_ID:DATASET_NAME.TABLE_NAME. Beispiel: my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT. Optional. Die E-Mail-Adresse des Dienstkontos, unter dem der Job ausgeführt werden soll.
  • UDF_SCRIPT_PATH. Optional. Der Cloud Storage-Pfad zu einer JavaScript-Datei, die eine UDF enthält. Beispiel: gs://your-bucket/your-function.js.
  • UDF_FUNCTION_NAME. Optional. Der Name der JavaScript-Funktion, die als UDF aufgerufen werden soll.
  • VPC_NETWORK_NAME. Optional. Das Netzwerk, dem Worker zugewiesen werden.
  • SUBNET_NAME. Optional. Das Subnetzwerk, dem Worker zugewiesen werden.

Datentypen

In diesem Abschnitt wird beschrieben, wie verschiedene Datentypen im BigQuery-Tabellenschema behandelt werden.

Intern werden die JSON-Nachrichten in TableRow-Objekte konvertiert und die TableRow-Feldwerte in BigQuery-Typen übersetzt.

Skalare Typen

Im folgenden Beispiel wird eine BigQuery-Tabelle mit verschiedenen skalaren Datentypen erstellt, darunter String-, numerische, boolesche, Datums-/Uhrzeit-, Intervall- und geografische Typen:

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Hier ist eine JSON-Nutzlast mit kompatiblen Feldern:

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Hinweise:

  • Für eine TIMESTAMP-Spalte können Sie den Wert mit der JavaScript-Methode Date.toJSON formatieren.
  • Für die Spalte GEOGRAPHY können Sie die Geografie mit dem bekannten Text (WKT) oder GeoJSON als String angeben. Weitere Informationen finden Sie unter Raumbezogene Daten laden.

Weitere Informationen zu Datentypen in BigQuery finden Sie unter Datentypen.

Arrays

Sie können ein Array in BigQuery mit dem Datentyp ARRAY speichern. Im folgenden Beispiel enthält die JSON-Nutzlast ein Attribut namens scores, dessen Wert ein JSON-Array ist:

{"name":"Emily","scores":[10,7,10,9]}

Mit der folgenden CREATE TABLE-SQL-Anweisung wird eine BigQuery-Tabelle mit einem kompatiblen Schema erstellt:

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

Die Tabelle sieht so aus:

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Bauwerke

Der Datentyp STRUCT in BigQuery enthält eine geordnete Liste benannter Felder. Sie können einen STRUCT verwenden, um JSON-Objekte zu speichern, die einem konsistenten Schema folgen.

Im folgenden Beispiel enthält die JSON-Nutzlast ein Attribut namens val, dessen Wert ein JSON-Objekt ist:

{"name":"Emily","val":{"a":"yes","b":"no"}}

Mit der folgenden CREATE TABLE-SQL-Anweisung wird eine BigQuery-Tabelle mit einem kompatiblen Schema erstellt:

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

Die Tabelle sieht so aus:

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Halbstrukturierte Ereignisdaten

Wenn die Kafka-Ereignisdaten keinem strengen Schema folgen, sollten Sie sie in BigQuery als JSON-Datentyp (Vorschau) speichern. Wenn Sie JSON-Daten als Datentyp JSON speichern, müssen Sie das Ereignisschema nicht im Voraus definieren. Nach Datenaufnahme können Sie die Ausgabetabelle mithilfe der Feldzugriffsoperatoren (Punktnotation) und Arrayzugriffsoperatoren abfragen.

Erstellen Sie zuerst eine Tabelle mit der Spalte JSON:

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Definieren Sie dann eine JavaScript-UDF, die die Ereignisnutzlast in einem JSON-Objekt umschließt:

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Nachdem die Daten in BigQuery geschrieben wurden, können Sie die einzelnen Felder mithilfe des Feldzugriffsoperators abfragen. Die folgende Abfrage gibt beispielsweise den Wert des Felds name für jeden Datensatz zurück:

SELECT event_data.name FROM my_dataset1.kafka_events;

Weitere Informationen zur Verwendung von JSON in BigQuery finden Sie unter Mit JSON-Daten in Google Standard-SQL arbeiten.

Fehler und Logging

Es können Fehler beim Ausführen der Pipeline oder Fehler bei der Verarbeitung einzelner Kafka-Ereignisse auftreten.

Weitere Informationen zur Handhabung von Pipelinefehlern finden Sie unter Pipelinefehler beheben.

Wenn der Job erfolgreich ausgeführt wird, aber beim Verarbeiten eines einzelnen Kafka-Ereignisses ein Fehler auftritt, schreibt der Pipelinejob einen Fehlerdatensatz in eine Tabelle in BigQuery. Der Job selbst schlägt nicht fehl und der Fehler auf Ereignisebene wird im Dataflow-Joblog nicht als Fehler angezeigt.

Der Pipelinejob erstellt automatisch die Tabelle, die Fehlerdatensätze enthält. Standardmäßig lautet der Name der Tabelle output_table_error_records, wobei output_table der Name der Ausgabetabelle ist. Wenn die Ausgabetabelle beispielsweise den Namen kafka_events hat, heißt die Fehlertabelle kafka_events_error_records. Mit dem Vorlagenparameter outputDeadletterTable können Sie einen anderen Namen angeben:

outputDeadletterTable=my_project:dataset1.errors_table

Im Folgenden finden Sie mögliche Fehler:

  • Serialisierungsfehler, einschließlich falsch formatierter JSON-Daten.
  • Typkonvertierungsfehler, die durch eine nicht übereinstimmende Tabelle und die JSON-Daten verursacht wurden.
  • Zusätzliche Felder in den JSON-Daten, die im Tabellenschema nicht vorhanden sind.

Beispiele für Fehlermeldungen:

Art des Fehlers Ereignisdaten errorMessage
Serialisierungfehler "Hello World" JSON konnte nicht in Tabellenzeile serialisiert werden: "Hello world"
Fehler bei Typkonvertierung {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Unbekanntes Feld {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Nächste Schritte