Vorlage „Datastream zu BigQuery (Stream)“

Die Vorlage "Datastream zu BigQuery" ist eine Streamingpipeline, die Datastream-Daten liest und in BigQuery repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert sie in eine nach der Zeit partitionierte BigQuery-Staging-Tabelle. Nach der Replikation führt die Vorlage einen MERGE-Vorgang in BigQuery aus, um alle CDC-Änderungen (Change Data Capture) in ein Replikat der Quelltabelle einzufügen bzw. dort zu aktualisieren.

Die Vorlage verarbeitet das Erstellen und Aktualisieren der BigQuery-Tabellen, die von der Replikation verwaltet werden. Wenn eine Datendefinitionssprache (DDL) erforderlich ist, extrahiert ein Callback an Datastream das Quelltabellenschema und übersetzt es in BigQuery-Datentypen. Unterstützte Vorgänge umfassen Folgendes:

  • Neue Tabellen werden beim Einfügen von Daten erstellt.
  • Den BigQuery-Tabellen werden neue Spalten mit Null-Anfangswerten hinzugefügt.
  • Verworfene Spalten werden in BigQuery ignoriert und zukünftige Werte sind null.
  • Umbenannte Spalten werden BigQuery als neue Spalten hinzugefügt.
  • Typänderungen werden nicht an BigQuery weitergegeben.

Es wird empfohlen, diese Pipeline im Modus Mindestens einmal streamen auszuführen, da die Vorlage eine Deduplizierung durchführt, wenn sie Daten aus einer temporären BigQuery-Tabelle mit der Haupttabelle von BigQuery zusammenführt. Dieser Schritt in der Pipeline bedeutet, dass es keinen zusätzlichen Vorteil durch die Verwendung des „Genau einmal“-Streamingmodus bietet.

Pipelineanforderungen

  • Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
  • Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
  • BigQuery-Ziel-Datasets werden erstellt und dem Compute Engine-Dienstkonto wurde Administratorzugriff darauf gewährt.
  • In der Quelltabelle ist ein Primärschlüssel erforderlich, damit die Ziel-Replikattabelle erstellt werden kann.
  • Eine MySQL- oder Oracle-Quelldatenbank. PostgreSQL-Datenbanken werden nicht unterstützt.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
gcsPubSubSubscription Das Pub/Sub-Abo mit Datastream-Dateibenachrichtigungen, z. B. projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
outputStagingDatasetTemplate Der Name eines vorhandenen Datasets, das Staging-Tabellen enthält. Sie können die Vorlage {_metadata_dataset} als Platzhalter einfügen, der durch den Namen Ihres Quell-Datasets/-Schemas ersetzt wird (z. B. {_metadata_dataset}_log).
outputDatasetTemplate Der Name eines vorhandenen Datasets, das Replikattabellen enthält. Sie können die Vorlage {_metadata_dataset} als Platzhalter einfügen, der durch den Namen Ihres Quell-Datasets/-Schemas ersetzt wird (z. B. {_metadata_dataset}).
deadLetterQueueDirectory Der Dateipfad zum Speichern nicht verarbeiteter Nachrichten mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
outputStagingTableNameTemplate Optional: Die Vorlage für den Namen der Staging-Tabellen. Der Standardwert ist {_metadata_table}_log. Wenn Sie mehrere Schemas replizieren, wird {_metadata_schema}_{_metadata_table}_log empfohlen.
outputTableNameTemplate Optional: Die Vorlage für den Namen der Replikattabellen. Standardeinstellung: {_metadata_table}. Wenn Sie mehrere Schemas replizieren, wird {_metadata_schema}_{_metadata_table} empfohlen.
outputProjectId Optional: Projekt für BigQuery-Datasets, in das Daten ausgegeben werden sollen. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
streamName Optional: Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Standardeinstellung: {_metadata_stream}.
mergeFrequencyMinutes Optional: Die Anzahl der Minuten zwischen den Zusammenführungen für eine bestimmte Tabelle. Standardeinstellung: 5.
dlqRetryMinutes Optional: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Standardeinstellung: 10.
javascriptTextTransformGcsPath Optional: Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName Optional: Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
useStorageWriteApi Optional: Wenn true, verwendet die Pipeline die BigQuery Storage Write API. Der Standardwert ist false. Weitere Informationen finden Sie unter BigQuery Storage Write API verwenden.
useStorageWriteApiAtLeastOnce Optional: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie "Mindestens einmal"-Semantik verwenden, legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
numStorageWriteApiStreams Optional: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
storageWriteApiTriggeringFrequencySec Optional: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.
applyMerge Optional: Gibt an, ob die Vorlage nach dem Replizieren von Daten in die Staging-Tabelle eine MERGE-Anweisung in BigQuery ausführt. Standard: true.
fileReadConcurrency Optional: Die Anzahl der Datastream-Dateien, die gleichzeitig gelesen werden sollen. Der Standardwert ist 10.
mergeConcurrency Optional: Die Anzahl der gleichzeitigen MERGE-Anweisungen von BigQuery. Standard: 30.
partitionRetentionDays Optional: Die Anzahl der Tage, die für die Partitionsaufbewahrung verwendet werden sollen, wenn BigQuery-MERGE-Anweisungen ausgeführt werden. Der Standardwert ist 1.
rfcStartDateTime Optional: Die Startzeit zum Lesen von Dateien aus Cloud Storage als RFC 3339-Datums-/Uhrzeitwert. Standard: 1970-01-01T00:00:00.00Z.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Die CDC-Daten, serialisiert als JSON-String.
  • Ausgabe: Ein JSON-String, der mit dem Schema der BigQuery-Zieltabelle übereinstimmt.
  • Führen Sie die Vorlage aus.

    Console

    1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
    2. Zur Seite "Job aus Vorlage erstellen“
    3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
    4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

      Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

    5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Datastream to BigQuery templateaus.
    6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
    7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
    8. Klicken Sie auf Job ausführen.

    gcloud

    Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET ist der Name Ihres BigQuery-Datasets.
    • BIGQUERY_TABLE ist Ihre BigQuery-Tabellenvorlage, z. B. {_metadata_schema}_{_metadata_table}_log.

    API

    Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
    • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
    • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET ist der Name Ihres BigQuery-Datasets.
    • BIGQUERY_TABLE ist Ihre BigQuery-Tabellenvorlage, z. B. {_metadata_schema}_{_metadata_table}_log.

    Nächste Schritte