Auf dieser Seite finden Sie Best Practices für die Verwendung von Datastream und Dataflow zum Streamen von Daten in BigQuery.
Replikat-Datasets mit benutzerdefinierten Schlüsseln partitionieren
Das Staging-Dataset in BigQuery wird automatisch partitioniert. Standardmäßig wird das Replikat-Dataset jedoch nicht partitioniert, da die Partitionsschlüssel für die Replikat-Tabellen auf Grundlage einer bestimmten Geschäftslogik definiert werden müssen und nicht von Datastream und Dataflow erzwungen werden.
Für jede Tabelle im Replikat-Dataset, die partitioniert werden muss:
Führen Sie das folgende SQL-Skript für jede Tabelle im Replikat-Dataset mit dem SQL-Editor in BigQuery aus. In diesem Beispiel hat die Tabelle
actor
im Datasetdatastream_cdc
eine Spaltelast_update
, die wir als Partitionsschlüssel festlegen möchten. Wenn Sie das Script ausführen, wird die Tabelle mit dem richtigen Partitionierungsschlüssel neu erstellt.create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new'
partition by date(last_update) as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor' drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor' alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor' Verwenden Sie die Vorlage „Datastream zu BigQuery“, um einen Dataflow-Job neu zu erstellen.
Benutzerdefinierte Funktionen zum Bearbeiten von Ereignisdaten ausführen
Sie können die Vorlage „Datastream zu BigQuery“ verwenden, um eine benutzerdefinierte JavaScript-Funktion auszuführen. Dazu müssen Sie zuerst eine Datei mit der Funktion an einem bestimmten Speicherort in Cloud Storage ablegen. Gehen Sie anschließend so vor:
- Verwenden Sie den Parameter
javascriptTextTransformGcsPath
in der Vorlage, um den Speicherort der Datei in Cloud Storage anzugeben, die Ihre benutzerdefinierte Funktion enthält. - Verwenden Sie den Parameter
javascriptTextTransformFunctionName
, um den Namen der JavaScript-Funktion anzugeben, die Sie als benutzerdefinierte Funktion aufrufen möchten.
Sie können beispielsweise eine benutzerdefinierte Funktion ausführen, um gelöschte Datensätze in den Tabellen des Replikat-Datasets in BigQuery beizubehalten. Dieser Vorgang wird als „vorläufiges Löschen“ bezeichnet.
Erstellen Sie dazu eine Funktion, die den Wert der Spalte _metadata_deleted
in eine neue Spalte mit dem Namen is_deleted
kopiert und dann den Wert der Spalte _metadata_deleted
auf false
zurücksetzt. Dadurch werden die Löschereignisse vom Dataflow-Job ignoriert und die gelöschten Datensätze bleiben beim Aktualisieren des Replikatdatasets in BigQuery erhalten.
Hier ist der Beispielcode für diese benutzerdefinierte Funktion:
/** * This function changes the behavior of the Datastream to * BigQuery template to allow soft deletes. * @param {string} messageString from DatastreamIO data * @return {string} same as an input message with an added property */ function transform(messageString) { // messageString is a JSON object as a string var messageJson = JSON.parse(messageString); // Moving the deleted flag to a new column will cause the pipeline to soft delete data. messageJson['is_deleted'] = messageJson['_metadata_deleted']; messageJson['_metadata_deleted'] = false; return JSON.stringify(messageJson); }
Zusammenführungshäufigkeit festlegen
Verwenden Sie den Parameter mergeFrequencyMinutes
der Vorlage „Datastream für BigQuery“, um die Häufigkeit des Zusammenführens festzulegen. Dies ist die Anzahl der Minuten zwischen den Zusammenführungen für eine bestimmte Tabelle im Replikat-Dataset in BigQuery. Während historische Daten nachgetragen werden, empfehlen wir, die Häufigkeit des Zusammenführens niedrig zu halten (12 oder 24 Stunden), um die Kosten im Rahmen zu halten.
Wenn Sie den Wert dieses Parameters beispielsweise auf 10 Minuten festlegen, führt Dataflow den Job, für den die Vorlage verwendet wird, alle 10 Minuten aus. Bei der ersten Ausführung des Jobs kommt es jedoch zu einer Verzögerung von 5 Minuten. Wenn der Job in diesem Beispiel um 9:14 Uhr ausgeführt wird, erfolgt die erste Zusammenführung um 9:29 Uhr (10 Minuten für die Zusammenführung und 5 Minuten für die Verzögerung). Die zweite Zusammenführung erfolgt um 9:39 Uhr und alle nachfolgenden Zusammenführungen erfolgen im 10-Minuten-Takt (9:49 Uhr, 9:59 Uhr, 10:09 Uhr usw.).
Wenn Sie die Zusammenführungsfrequenz auf 60 Minuten festlegen, wird der Job zur vollen Stunde ausgeführt, nachdem er beim ersten Ausführen 5 Minuten lang verzögert wurde. Wenn der Job für 10:00 Uhr geplant ist, wird er aufgrund der 5-minütigen Verzögerung tatsächlich um 10:05 Uhr ausgeführt. Alle nachfolgenden Zusammenführungen erfolgen im 60-Minuten-Takt (11:05 Uhr, 12:05 Uhr, 13:05 Uhr usw.).
Unabhängig davon, ob Sie Kosten senken möchten oder andere Gründe vorliegen, ist es möglicherweise nicht möglich, Zusammenführungen so häufig durchzuführen, wie es für Ihr Unternehmen erforderlich ist. Möglicherweise sind die Daten nicht aktuell. Um auf die neuesten Daten zuzugreifen, erstellen Sie in BigQuery eine Ansicht für die Tabellen der Staging- und Replikat-Datasets, die die Zusammenführung nachbildet. Diese Ansicht wird als eine logische Tabelle erstellt (sowohl für die Staging- als auch für die Replikat-Datasets). Wenn die Zusammenführungsfrequenz niedrig ist und Sie schneller auf die Daten zugreifen müssen, verwenden Sie die Ansicht.