Auf dieser Seite finden Sie Best Practices für die Datenübertragung mit Datastream und Dataflow in BigQuery.
Replikate nach benutzerdefinierten Schlüsseln partitionieren
Das Staging-Dataset in BigQuery wird automatisch partitioniert. Standardmäßig ist das Replikat-Dataset jedoch nicht partitioniert, da die Partitionsschlüssel in den Replikationstabellen anhand einer bestimmten Geschäftslogik definiert werden müssen, anstatt von Datastream und Dataflow erzwungen zu werden.
Für jede Tabelle im Replikationsdatensatz, die partitioniert werden muss:
Führen Sie mit dem SQL-Editor in BigQuery das folgende SQL-Script für jede Tabelle im Replikat-Dataset aus. In diesem Beispiel enthält die Tabelle
actor
im Datensatzdatastream_cdc
die Spaltelast_update
, die wir als Partitionsschlüssel festlegen möchten. Wenn Sie das Script ausführen, wird die Tabelle mit dem richtigen Partitionsschlüssel neu erstellt.create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new'
partition by date(last_update) as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor' drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor' alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor' Verwenden Sie die Vorlage „Datastream zu BigQuery“, um einen Dataflow-Job neu zu erstellen.
Benutzerdefinierte Funktionen zum Bearbeiten von Ereignisdaten ausführen
Mit der Vorlage „Datastream zu BigQuery“ können Sie eine benutzerdefinierte JavaScript-Funktion ausführen. Legen Sie dazu zuerst eine Datei mit der Funktion an einem bestimmten Speicherort in Cloud Storage ab. Gehen Sie anschließend so vor:
- Verwenden Sie den Parameter
javascriptTextTransformGcsPath
in der Vorlage, um den Speicherort der Datei in Cloud Storage anzugeben, die Ihre benutzerdefinierte Funktion enthält. - Geben Sie mit dem Parameter
javascriptTextTransformFunctionName
den Namen der JavaScript-Funktion an, die Sie als benutzerdefinierte Funktion aufrufen möchten.
Sie können beispielsweise eine benutzerdefinierte Funktion ausführen, um gelöschte Einträge in den Tabellen des Replikat-Datasets in BigQuery beizubehalten. Dieser Vorgang wird als Soft Delete bezeichnet.
Erstellen Sie dazu eine Funktion, die den Wert der Spalte _metadata_deleted
in eine neue Spalte namens is_deleted
kopiert und dann den Wert der Spalte _metadata_deleted
auf false
zurücksetzt. Dadurch werden die Löschereignisse vom Dataflow-Job ignoriert und die gelöschten Datensätze beim Aktualisieren des Replikat-Datasets in BigQuery beibehalten.
Hier ist der Beispielcode für diese benutzerdefinierte Funktion:
/** * This function changes the behavior of the Datastream to * BigQuery template to allow soft deletes. * @param {string} messageString from DatastreamIO data * @return {string} same as an input message with an added property */ function transform(messageString) { // messageString is a JSON object as a string var messageJson = JSON.parse(messageString); // Moving the deleted flag to a new column will cause the pipeline to soft delete data. messageJson['is_deleted'] = messageJson['_metadata_deleted']; messageJson['_metadata_deleted'] = false; return JSON.stringify(messageJson); }
Zusammenführungshäufigkeit festlegen
Verwenden Sie den Parameter mergeFrequencyMinutes
der Vorlage „Datastream für BigQuery“, um die Zusammenführungshäufigkeit festzulegen. Das ist die Anzahl der Minuten zwischen den Zusammenführungen für eine bestimmte Tabelle im Replikat-Dataset in BigQuery. Während die Backfill-Daten eingefügt werden, empfehlen wir, die Zusammenführungshäufigkeit niedrig zu halten (12 oder 24 Stunden), um die Kosten im Rahmen zu halten.
Wenn Sie den Wert dieses Parameters beispielsweise auf 10 Minuten festlegen, führt Dataflow den Job, der die Vorlage verwendet, alle 10 Minuten aus. Bei der ersten Ausführung des Jobs kommt es jedoch zu einer Verzögerung von 5 Minuten. Wenn der Job in diesem Beispiel um 9:14 Uhr ausgeführt wird, erfolgt die erste Zusammenführung um 9:29 Uhr (10 Minuten für die Zusammenführung und 5 Minuten für die Verzögerung). Die zweite Zusammenführung erfolgt um 9:39 Uhr und alle nachfolgenden Zusammenführungen in 10-Minuten-Intervallen (9:49 Uhr, 9:59 Uhr, 10:09 Uhr usw.).
Wenn Sie die Zusammenführungshäufigkeit auf 60 Minuten festlegen, wird der Job stündlich ausgeführt, nach einer Verzögerung von 5 Minuten für die erste Ausführung des Jobs. Wenn der Job für 10:00 Uhr geplant ist, wird er aufgrund der 5-minütigen Verzögerung tatsächlich um 10:05 Uhr ausgeführt. Alle nachfolgenden Zusammenführungen erfolgen im Abstand von 60 Minuten (11:05 Uhr, 12:05 Uhr, 13:05 Uhr usw.).
Aus Kostengründen oder aus anderen Gründen können Sie eine Zusammenführung möglicherweise nicht so oft durchführen, wie es Ihren Geschäftsanforderungen entspricht. Möglicherweise sind die Daten nicht auf dem neuesten Stand. Wenn Sie auf die neuesten Daten zugreifen möchten, erstellen Sie in BigQuery eine Ansicht über den Tabellen der Staging- und Replikat-Datasets, in der die Zusammenführung nachgebildet wird. Diese Ansicht wird als eine logische Tabelle erstellt (sowohl für das Staging- als auch für das Replikations-Dataset). Wenn die Zusammenführungshäufigkeit niedrig ist und Sie schnelleren Zugriff auf die Daten benötigen, verwenden Sie die Datenansicht.