Daten mit Datastream und Dataflow in BigQuery streamen

Auf dieser Seite finden Sie Best Practices für die Verwendung von Datastream und Dataflow zum Streamen von Daten in BigQuery.

Replikate nach benutzerdefinierten Schlüsseln partitionieren

Das Staging-Dataset in BigQuery wird automatisch partitioniert. Standardmäßig ist das Replikat-Dataset jedoch nicht partitioniert, da die Partitionsschlüssel in den Replikationstabellen anhand einer bestimmten Geschäftslogik definiert werden müssen, anstatt von Datastream und Dataflow erzwungen zu werden.

Für jede Tabelle im Replikationsdatensatz, die partitioniert werden muss:

  1. Beenden Sie den Dataflow-Job und entfernen Sie ihn.

  2. Führen Sie mit dem SQL-Editor in BigQuery das folgende SQL-Script für jede Tabelle im Replikat-Dataset aus. In diesem Beispiel enthält die Tabelle actor im Datensatz datastream_cdc die Spalte last_update, die wir als Partitionsschlüssel festlegen möchten. Durch Ausführen des Skripts erstellen Sie die Tabelle mit dem richtigen Partitionsschlüssel neu.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Verwenden Sie die Vorlage „Datastream für BigQuery“, um einen Dataflow-Job neu zu erstellen.

Benutzerdefinierte Funktionen zum Bearbeiten von Ereignisdaten ausführen

Sie können die Vorlage „Datastream to BigQuery“ verwenden, um eine benutzerdefinierte JavaScript-Funktion auszuführen. Legen Sie dazu zuerst eine Datei mit der Funktion an einem bestimmten Speicherort in Cloud Storage ab. Gehen Sie anschließend so vor:

  • Verwenden Sie den Parameter javascriptTextTransformGcsPath in der Vorlage, um den Speicherort der Datei in Cloud Storage anzugeben, die Ihre benutzerdefinierte Funktion enthält.
  • Verwenden Sie den Parameter javascriptTextTransformFunctionName, um den Namen der JavaScript-Funktion anzugeben, die Sie als benutzerdefinierte Funktion aufrufen möchten.

Sie können beispielsweise eine benutzerdefinierte Funktion ausführen, um gelöschte Einträge in den Tabellen des Replikat-Datasets in BigQuery aufzubewahren. Dieser Vorgang wird als Soft Delete bezeichnet.

Dazu erstellen Sie eine Funktion, die den Wert der Spalte _metadata_deleted in eine neue Spalte mit dem Namen is_deleted kopiert und dann den Wert der Spalte _metadata_deleted auf false zurücksetzt. Dadurch werden die Löschereignisse vom Dataflow-Job ignoriert und die gelöschten Datensätze beim Aktualisieren des Replikat-Datasets in BigQuery beibehalten.

Hier ist der Beispielcode für diese benutzerdefinierte Funktion:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Zusammenführungshäufigkeit festlegen

Verwenden Sie den Parameter mergeFrequencyMinutes der Vorlage „Datastream für BigQuery“, um die Zusammenführungshäufigkeit festzulegen. Die Anzahl der Minuten zwischen Zusammenführungen für eine bestimmte Tabelle im Replikat-Dataset in BigQuery. Während die Verlaufsdaten aufgefüllt werden, empfehlen wir, die Häufigkeit der Zusammenführung gering zu halten (12 oder 24 Stunden), um die Kosten unter Kontrolle zu halten.

Wenn Sie den Wert dieses Parameters beispielsweise auf 10 Minuten festlegen, führt Dataflow den Job, der die Vorlage verwendet, alle 10 Minuten aus. Bei der ersten Ausführung des Jobs kommt es jedoch zu einer Verzögerung von 5 Minuten. Wenn der Job in diesem Beispiel um 9:14 Uhr ausgeführt wird, erfolgt die erste Zusammenführung um 9:29 Uhr (10 Minuten für die Zusammenführung und 5 Minuten für die Verzögerung). Die zweite Zusammenführung findet um 9:39 Uhr und alle nachfolgenden Zusammenführungen in 10-Minuten-Intervallen statt (9:49 Uhr, 9:59 Uhr, 10:09 Uhr usw.).

Wenn Sie die Zusammenführungshäufigkeit auf 60 Minuten festlegen, wird der Job stündlich ausgeführt, nach einer Verzögerung von 5 Minuten für die erste Ausführung des Jobs. Wenn die Ausführung des Jobs für 10:00 Uhr geplant ist, wird er aufgrund der Verzögerung von 5 Minuten tatsächlich um 10:05 Uhr ausgeführt. Alle nachfolgenden Zusammenführungen erfolgen im 60-Minuten-Takt (11:05 Uhr, 12:05 Uhr, 13:05 Uhr usw.).

Sowohl aus Kostenkontrolle als auch aus anderen Gründen können Sie möglicherweise keine Zusammenführung in einer solchen Häufigkeit durchführen, die Ihren geschäftlichen Anforderungen entspricht. Möglicherweise sind nicht die neuesten Daten verfügbar. Wenn Sie auf die neuesten Daten zugreifen möchten, erstellen Sie in BigQuery eine Ansicht über den Tabellen der Staging- und Replikat-Datasets, in der die Zusammenführung nachgebildet wird. Diese Ansicht wird als eine logische Tabelle erstellt (sowohl für das Staging- als auch für das Replikations-Dataset). Wenn die Zusammenführungshäufigkeit gering ist und Sie schnelleren Zugriff auf die Daten benötigen, verwenden Sie die Ansicht.