Datastream und Dataflow zum Streamen von Daten in BigQuery verwenden

Auf dieser Seite finden Sie Best Practices für die Verwendung von Datastream und Dataflow zum Streamen von Daten in BigQuery.

Replikat-Datasets nach benutzerdefinierten Schlüsseln partitionieren

Das Staging-Dataset in BigQuery wird automatisch partitioniert. Standardmäßig wird das Replikat-Dataset jedoch nicht partitioniert, da die Partitionsschlüssel in den Replikattabellen anhand bestimmter Geschäftslogik definiert werden müssen, anstatt von Datastream und Dataflow erzwungen zu werden.

Gehen Sie für jede Tabelle im Replikat-Dataset, die partitioniert werden muss, so vor:

  1. Beenden und beenden Sie den Dataflow-Job.

  2. Führen Sie mit dem SQL-Editor in BigQuery das folgende SQL-Script für jede Tabelle im Replikat-Dataset aus. In diesem Beispiel hat die Tabelle actor im Dataset datastream_cdc eine last_update-Spalte, die wir als Partitionsschlüssel festlegen möchten. Durch Ausführen des Skripts erstellen Sie die Tabelle mit dem richtigen Partitionsschlüssel neu.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
    
    
  3. Verwenden Sie die Vorlage Datastream zu BigQuery, um einen Dataflow-Job neu zu erstellen.

Benutzerdefinierte Funktionen ausführen, um Ereignisdaten zu bearbeiten

Mit der Vorlage „Datastream to BigQuery“ können Sie eine benutzerdefinierte JavaScript-Funktion ausführen. Legen Sie dazu zuerst eine Datei mit der Funktion an einem bestimmten Speicherort in Cloud Storage ab. Gehen Sie anschließend so vor:

  • Verwenden Sie den Parameter javascriptTextTransformGcsPath in der Vorlage, um den Speicherort der Datei in Cloud Storage anzugeben, die Ihre benutzerdefinierte Funktion enthält.
  • Verwenden Sie den Parameter javascriptTextTransformFunctionName, um den Namen der JavaScript-Funktion anzugeben, die Sie als benutzerdefinierte Funktion aufrufen möchten.

Sie können beispielsweise eine benutzerdefinierte Funktion ausführen, um gelöschte Datensätze in den Tabellen des Replikat-Datasets in BigQuery aufzubewahren. Dieser Vorgang wird als vorläufiges Löschen bezeichnet.

Dazu erstellen Sie eine Funktion, die den Wert der Spalte _metadata_deleted in eine neue Spalte mit dem Namen is_deleted kopiert und dann den Wert der Spalte _metadata_deleted auf false zurücksetzt. Dadurch ignoriert der Dataflow-Job die Löschereignisse und behält die gelöschten Einträge beim Aktualisieren des Replikat-Datasets in BigQuery bei.

Hier ist der Beispielcode für diese benutzerdefinierte Funktion:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Zusammenführungshäufigkeit festlegen

Verwenden Sie den Parameter mergeFrequencyMinutes der Vorlage Datastream zu BigQuery, um die Zusammenführungshäufigkeit festzulegen. Dies ist die Anzahl der Minuten zwischen den Zusammenführungen einer bestimmten Tabelle im Replikat-Dataset in BigQuery. Während Verlaufsdaten aufgefüllt werden, empfehlen wir, die Zusammenführungshäufigkeit niedrig (12 oder 24 Stunden) zu halten, um die Kosten unter Kontrolle zu halten.

Wenn Sie den Wert dieses Parameters beispielsweise auf 10 Minuten festlegen, führt Dataflow den Job, der die Vorlage verwendet, alle 10 Minuten aus. Bei der ersten Ausführung des Jobs kommt es jedoch zu einer Verzögerung von 5 Minuten. Wenn der Job in diesem Beispiel um 9:14 Uhr ausgeführt wird, erfolgt die erste Zusammenführung um 9:29 Uhr (10 Minuten für die Zusammenführung und 5 Minuten für die Verzögerung). Die zweite Zusammenführung erfolgt um 9:39 Uhr und alle nachfolgenden Zusammenführungen finden in 10-minütigen Intervallen statt (9:49 Uhr, 9:59 Uhr, 10:09 Uhr usw.).

Wenn Sie die Zusammenführungshäufigkeit auf 60 Minuten festlegen, wird der Job zur vollen Stunde ausgeführt, und zwar mit einer Verzögerung von 5 Minuten bei der ersten Ausführung des Jobs. Wenn die Ausführung des Jobs um 10:00 Uhr geplant ist, wird er aufgrund der 5-minütigen Verzögerung tatsächlich um 10:05 Uhr ausgeführt. Alle nachfolgenden Zusammenführungen erfolgen in 60-minütigen Intervallen (11:05 Uhr, 12:05 Uhr, 13:05 Uhr usw.).

Ob aufgrund der Kostenkontrolle oder aus anderen Gründen, Sie können eine Zusammenführung möglicherweise nicht in einer Häufigkeit durchführen, die Ihren Geschäftsanforderungen entspricht. Ihnen stehen möglicherweise nicht die neuesten Daten zur Verfügung. Um auf die neuesten Daten zuzugreifen, erstellen Sie eine Ansicht über den Tabellen der Staging- und Replikat-Datasets in BigQuery, in der die Ansicht die Zusammenführung simuliert. Diese Ansicht wird als eine logische Tabelle erstellt (sowohl für die Staging- als auch die Replikat-Datasets). Verwenden Sie die Ansicht, wenn die Zusammenführungshäufigkeit niedrig ist und Sie schneller auf die Daten zugreifen möchten.