Die Vorlage "Datastream zu BigQuery" ist eine Streamingpipeline, die Datastream-Daten liest und in BigQuery repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert sie in eine nach der Zeit partitionierte BigQuery-Staging-Tabelle. Nach der Replikation führt die Vorlage einen MERGE
-Vorgang in BigQuery aus, um alle CDC-Änderungen (Change Data Capture) in ein Replikat der Quelltabelle einzufügen bzw. dort zu aktualisieren.
Die Vorlage verarbeitet das Erstellen und Aktualisieren der BigQuery-Tabellen, die von der Replikation verwaltet werden. Wenn eine Datendefinitionssprache (DDL) erforderlich ist, extrahiert ein Callback an Datastream das Quelltabellenschema und übersetzt es in BigQuery-Datentypen. Unterstützte Vorgänge umfassen Folgendes:
- Neue Tabellen werden beim Einfügen von Daten erstellt.
- Den BigQuery-Tabellen werden neue Spalten mit Null-Anfangswerten hinzugefügt.
- Verworfene Spalten werden in BigQuery ignoriert und zukünftige Werte sind null.
- Umbenannte Spalten werden BigQuery als neue Spalten hinzugefügt.
- Typänderungen werden nicht an BigQuery weitergegeben.
Es wird empfohlen, diese Pipeline im Modus Mindestens einmal streamen auszuführen, da die Vorlage eine Deduplizierung durchführt, wenn sie Daten aus einer temporären BigQuery-Tabelle mit der Haupttabelle von BigQuery zusammenführt. Dieser Schritt in der Pipeline bedeutet, dass die Verwendung des Streamingmodus „Genau einmal“ keinen zusätzlichen Vorteil bietet.
Pipelineanforderungen
- Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
- Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
- BigQuery-Ziel-Datasets werden erstellt und dem Compute Engine-Dienstkonto wurde Administratorzugriff darauf gewährt.
- In der Quelltabelle ist ein Primärschlüssel erforderlich, damit die Ziel-Replikattabelle erstellt werden kann.
- Eine MySQL- oder Oracle-Quelldatenbank. PostgreSQL- und SQL Server-Datenbanken werden nicht unterstützt.
Vorlagenparameter
Erforderliche Parameter
- inputFilePattern: Der Dateispeicherort für die Datastream-Dateiausgabe in Cloud Storage im Format
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat: Das Format der von Datastream generierten Ausgabedateien. Zulässige Werte sind
avro
undjson
. Die Standardeinstellung istavro
. - gcsPubSubSubscription: Das Pub/Sub-Abo, das von Cloud Storage verwendet wird, um Dataflow über neue, zur Verarbeitung verfügbare Dateien zu informieren. Dabei wird folgendes Format verwendet:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate: Der Name des Datasets, das die Staging-Tabellen enthält. Dieser Parameter unterstützt Vorlagen, z. B.
{_metadata_dataset}_log
odermy_dataset_log
. Normalerweise ist dies der Name eines Datensatzes. Die Standardeinstellung ist{_metadata_dataset}
. - outputDatasetTemplate: Der Name des Datasets, das die Replikattabellen enthält. Dieser Parameter unterstützt Vorlagen, z. B.
{_metadata_dataset}
odermy_dataset
. Normalerweise ist dies der Name eines Datensatzes. Die Standardeinstellung ist{_metadata_dataset}
. - deadLetterQueueDirectory: Der Pfad, den Dataflow zum Schreiben der Ausgabe der Warteschlange für unzustellbare Nachrichten verwendet. Dieser Pfad darf sich nicht im selben Pfad wie die Datastream-Dateiausgabe befinden. Die Standardeinstellung ist
empty
.
Optionale Parameter
- streamName: Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Standardeinstellung: {_metadata_stream}. Der Standardwert ist in der Regel ausreichend.
- rfcStartDateTime: Das Startdatum, das zum Abrufen von Daten aus Cloud Storage verwendet werden soll (https://tools.ietf.org/html/rfc3339). Die Standardeinstellung ist:
1970-01-01T00:00:00.00Z
. - fileReadConcurrency: Die Anzahl der gleichzeitig zu lesenden DataStream-Dateien. Der Standardwert ist
10
. - outputProjectId: Die ID des Google Cloud-Projekts, das die BigQuery-Datasets enthält, in die Daten ausgegeben werden sollen. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
- outputStagingTableNameTemplate: Die Vorlage, die zum Benennen der Staging-Tabellen verwendet werden soll. Beispiel:
{_metadata_table}
. Die Standardeinstellung ist{_metadata_table}_log
. - outputTableNameTemplate: Die Vorlage, die für den Namen der Replikattabellen verwendet werden soll, z. B.
{_metadata_table}
. Die Standardeinstellung ist{_metadata_table}
. - ignoreFields: Durch Kommas getrennte Felder, die in BigQuery ignoriert werden sollen. Die Standardeinstellung ist
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Beispiel:_metadata_stream,_metadata_schema
- mergeFrequencyMinutes: Die Anzahl der Minuten zwischen Zusammenführungen für eine bestimmte Tabelle. Die Standardeinstellung ist
5
. - dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungsversuchen. Die Standardeinstellung ist
10
. - dataStreamRootUrl: Die Stamm-URL der Datastream API. Standardeinstellung: https://datastream.googleapis.com/.
- applyMerge: Gibt an, ob MERGE-Abfragen für den Job deaktiviert werden sollen. Die Standardeinstellung ist
true
. - mergeConcurrency: Die Anzahl der gleichzeitigen BigQuery-MERGE-Abfragen. Nur wirksam, wenn applyMerge auf true gesetzt ist. Die Standardeinstellung ist
30
. - partitionRetentionDays: Die Anzahl der Tage, über die hinweg Partitionen nach der Ausführung von BigQuery-Zusammenführungen aufbewahrt werden. Die Standardeinstellung ist
1
. - useStorageWriteApiAtLeastOnce: Dieser Parameter wird nur wirksam, wenn
Use BigQuery Storage Write API
aktiviert ist. Beitrue
wird für die Storage Write API eine „Mindestens einmal“-Semantik verwendet. Andernfalls wird die „Genau einmal“-Semantik verwendet. Die Standardeinstellung istfalse
. - javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel:
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise
myTransform(inJson) { /*...do stuff...*/ }
ist, lautet der FunktionsnamemyTransform
. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: Gibt an, wie oft die UDF aktualisiert werden soll (in Minuten). Wenn der Wert größer als 0 ist, prüft Dataflow regelmäßig die UDF-Datei in Cloud Storage und lädt die UDF neu, wenn die Datei geändert wurde. Mit diesem Parameter können Sie die UDF aktualisieren, während die Pipeline ausgeführt wird, ohne den Job neu starten zu müssen. Wenn der Wert
0
ist, ist das Neuladen der UDF deaktiviert. Der Standardwert ist0
. - pythonTextTransformGcsPath: Das Cloud Storage-Pfadmuster für den Python-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel:
gs://your-bucket/your-transforms/*.py
. - pythonRuntimeVersion: Die Laufzeitversion, die für diese Python-UDF verwendet werden soll.
- pythonTextTransformFunctionName: Der Name der Funktion, die aus Ihrer JavaScript-Datei aufgerufen werden soll. Verwenden Sie nur Buchstaben, Ziffern und Unterstriche. Beispiel:
transform_udf1
. - runtimeRetries: Die Anzahl der Ausführungsversuche einer Laufzeit, bevor der Vorgang fehlschlägt. Die Standardeinstellung ist 5.
- useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false
. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen.
Benutzerdefinierte Funktion
Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Funktionsspezifikation
UDFs haben die folgende Spezifikation:
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Datastream to BigQuery templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel:projects/my-project-id/subscriptions/my-subscription-id
BIGQUERY_DATASET
ist der Name Ihres BigQuery-Datasets.BIGQUERY_TABLE
ist Ihre BigQuery-Tabellenvorlage, z. B.{_metadata_schema}_{_metadata_table}_log
.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel:projects/my-project-id/subscriptions/my-subscription-id
BIGQUERY_DATASET
ist der Name Ihres BigQuery-Datasets.BIGQUERY_TABLE
ist Ihre BigQuery-Tabellenvorlage, z. B.{_metadata_schema}_{_metadata_table}_log
.