Von Google bereitgestellte Dataflow-Streamingvorlagen

Google bietet eine Reihe von Open-Source-Vorlagen für Dataflow.

Mit diesen Dataflow-Vorlagen können Sie umfangreiche Datenaufgaben wie Datenimport, Datenexport, Datensicherung, Datenwiederherstellung und Bulk-API-Vorgänge lösen, ohne eine dedizierte Entwicklungsumgebung zu verwenden. Die Vorlagen basieren auf Apache Beam und verwenden Dataflow, um die Daten zu transformieren.

Allgemeine Informationen zu Vorlagen finden Sie unter Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie unter Erste Schritte mit von Google bereitgestellten Vorlagen.

In dieser Anleitung werden Streamingvorlagen beschrieben.

Pub/Sub-Abo für BigQuery

Die Vorlage "Pub/Sub-Abo für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise können Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen. Das Tabellenschema muss mit den JSON-Eingabeobjekten übereinstimmen.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle im Format <my-project>:<my-dataset>.<my-table> für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen OUTPUT_TABLE_SPEC_error_records verwendet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Pub/Sub-Abo für BigQuery-Vorlage ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Subscription to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

Pub/Sub-Thema für BigQuery

Die Vorlage "Pub/Sub-Thema für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise können Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen. Das Tabellenschema muss mit den JSON-Eingabeobjekten übereinstimmen.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<project>/topics/<topic>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Sie sollte das Format <my-project>:<my-dataset>.<my-table> haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Vorlage "Pub/Sub-Thema für BigQuery" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Topic to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

Pub/Sub Avro für BigQuery

Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Voraussetzungen für diese Pipeline

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
schemaPath Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel: gs://path/to/my/schema.avsc
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. Beispiel: <my-project>:<my-dataset>.<my-table> Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden.
writeDisposition (Optional) Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standardeinstellung: WRITE_APPEND
createDisposition (Optional) Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standardeinstellung: CREATE_IF_NEEDED

Vorlage „Pub/Sub Avro für BigQuery“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Avro to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

Pub/Sub Proto für BigQuery

Die Vorlage „Pub/Sub Proto für BigQuery“ ist eine Streamingpipeline, die Proto-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Sie können eine benutzerdefinierte JavaScript-Funktion (UDF) zum Transformieren von Daten bereitstellen. Fehler während der Ausführung der UDF können entweder an ein separates Pub/Sub-Thema oder an dasselbe nicht verarbeitete Thema wie die BigQuery-Fehler gesendet werden.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Proto-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Ausgabethema muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
  • Wenn die BigQuery-Tabelle vorhanden ist, muss sie ein Schema haben, das mit den Proto-Daten unabhängig vom createDisposition-Wert übereinstimmt.

Vorlagenparameter

Parameter Beschreibung
protoSchemaPath Der Cloud Storage-Speicherort der eigenständigen Proto-Schemadatei. z. B. gs://path/to/my/file.pb. Diese Datei kann mit dem Flag --descriptor_set_out des Befehls protoc generiert werden. Das Flag --include_imports garantiert, dass die Datei unabhängig ist.
fullMessageName Der vollständige Proto-Nachrichtenname. Beispiel: package.name.MessageName, wobei package.name der Wert für die Anweisung package und nicht für die Anweisung java_package ist.
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. z. B. my-project:my_dataset.my_table. Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit der Eingabeschemadatei erstellt werden.
preserveProtoFieldNames (Optional) true, um den ursprünglichen Proto-Feldnamen in JSON beizubehalten. false, um weitere JSON-Standardnamen zu verwenden. Zum Beispiel würde false field_name in fieldName ändern. (Standard: false)
bigQueryTableSchemaPath (Optional) Cloud Storage-Pfad zum BigQuery-Schemapfad. z. B. gs://path/to/my/schema.json. Falls nicht angegeben ist, wird das Schema aus dem Proto-Schema abgeleitet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
udfOutputTopic (Optional) Das Pub/Sub-Thema, in dem die UDF-Fehler gespeichert werden. z. B. projects/<project-id>/topics/<topic-name> Wenn nicht angegeben, werden UDF-Fehler an dasselbe Thema wie outputTopic gesendet.
writeDisposition (Optional) Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standard: WRITE_APPEND.
createDisposition (Optional) Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standard: CREATE_IF_NEEDED.

Vorlage Pub/Sub Proto für BigQuery ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Proto to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

Pub/Sub zu Pub/Sub

Die Vorlage "Pub/Sub für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und in ein anderes Pub/Sub-Thema schreibt. Die Pipeline akzeptiert auch einen optionalen Nachrichtenattributschlüssel und einen Wert, die zum Filtern der Nachrichten verwendet werden können, die in das Pub/Sub-Thema geschrieben werden sollen. Sie können diese Vorlage verwenden, um Nachrichten mit einem optionalen Nachrichtenfilter von einem Pub/Sub-Abo in ein anderes Pub/Sub-Thema zu kopieren.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vor der Ausführung vorhanden sein.
  • Das Quell-Pub/Sub-Abo muss ein Pull-Abo sein.
  • Das Pub/Sub-Thema, in das geschrieben werden soll, muss vor der Ausführung vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Pub/Sub-Abo, aus dem die Eingabe gelesen wird. z. B. projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub-Thema, in das die Ausgabe geschrieben wird. z. B. projects/<project-id>/topics/<topic-name>.
filterKey (Optional) Filterereignisse nach Attributschlüssel. Wenn filterKey nicht festgelegt ist, werden keine Filter angewendet.
filterValue (Optional) Filterattributwert für den Fall, dass "filterKey" bereitgestellt wird. Standardmäßig ist für filterValue null festgelegt.

Vorlage "Pub/Sub zu Pub/Sub" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Pub/Sub template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

Pub/Sub zu Splunk

Die Vorlage "Pub/Sub für Splunk" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und die Nutzlast der Nachricht über den HTTP Event Collector (HEC) von Splunk in Splunk schreibt. Der häufigste Anwendungsfall dieser Vorlage ist das Exportieren von Logs nach Splunk. Ein Beispiel für den zugrunde liegenden Workflow finden Sie unter Produktionsfähige Logexporte in Dataflow für Splunk bereitstellen.

Vor dem Schreiben in Splunk können Sie auch eine benutzerdefinierte JavaScript-Funktion auf die Nachrichtennutzlast anwenden. Alle Nachrichten, bei denen Verarbeitungsfehler auftreten, werden zur weiteren Fehlerbehebung und erneuten Verarbeitung an ein unverarbeitetes Thema in Pub/Sub weitergeleitet.

Als zusätzlichen Schutz für Ihr HEC-Token können Sie auch einen Cloud KMS-Schlüssel zusammen mit dem base64-codierten HEC-Tokenparameter übergeben, der mit dem Cloud KMS-Schlüssel verschlüsselt ist. Weitere Informationen zum Verschlüsseln des HEC-Tokenparameters finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Auf den Splunk-HEC-Endpunkt muss über das Dataflow-Worker-Netzwerk zugegriffen werden können.
  • Das Splunk-HEC-Token muss generiert und verfügbar sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Abo, aus dem die Eingabe gelesen wird. z. B. projects/<project-id>/subscriptions/<subscription-name>.
token (Optional) Das Splunk-HEC-Authentifizierungstoken. Muss angegeben werden, wenn tokenSource auf PLAINTEXT oder KMS festgelegt ist.
url Die Splunk-HEC-URL. Diese muss von der VPC, in der die Pipeline ausgeführt wird, weitergeleitet werden können. Beispiel: https://splunk-hec-host:8088.
outputDeadletterTopic Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. z. B. projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
batchCount (Optional) Die Batchgröße zum Senden mehrerer Ereignisse an Splunk. Standardeinstellung: 1 (keine Batchverarbeitung).
parallelism (Optional) Die maximale Anzahl an parallelen Anfragen. Standardeinstellung: 1 (keine Parallelität).
disableCertificateValidation (Optional) SSL-Zertifikatsvalidierung deaktivieren. Standardeinstellung: "false" (Validierung aktiviert). Bei "wahr" werden die Zertifikate nicht validiert (alle Zertifikate sind vertrauenswürdig) und der Parameter "rootCaCertificatePath" wird ignoriert.
includePubsubMessage (Optional) Schließen Sie die vollständige Pub/Sub-Nachricht in die Nutzlast ein. Standardeinstellung: "false" (nur das Datenelement ist in der Nutzlast enthalten).
tokenSource Quelle des Tokens. Entweder PLAINTEXT, KMS oder SECRET_MANAGER. Dieser Parameter muss angegeben werden, wenn Secret Manager verwendet wird. Wenn tokenSource auf KMS festgelegt ist, muss tokenKMSEncryptionKey und verschlüsselter token bereitgestellt werden. Wenn tokenSource auf SECRET_MANAGER festgelegt ist, muss tokenSecretId bereitgestellt werden. Wenn tokenSource auf PLAINTEXT festgelegt ist, muss token bereitgestellt werden.
tokenKMSEncryptionKey (Optional) Der Cloud KMS-Schlüssel zum Entschlüsseln des HEC-Tokenstrings. Dieser Parameter muss angegeben werden, wenn tokenSource auf KMS gesetzt ist. Wenn der Cloud KMS-Schlüssel bereitgestellt wird, muss der HEC-Tokenstring verschlüsselt übergeben werden.
tokenSecretId (Optional) Die Secret Manager-Secret-ID für das Token. Dieser Parameter muss angegeben werden, wenn tokenSource auf SECRET_MANAGER festgelegt ist. Er sollte folgendes Format haben: projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath (Optional) Die vollständige URL zum Stamm-CA-Zertifikat in Cloud Storage. z. B. gs://mybucket/mycerts/privateCA.crt. Das in Cloud Storage bereitgestellte Zertifikat muss DER-codiert sein und kann in binärer oder druckbarer Base64-Codierung bereitgestellt werden. Wenn das Zertifikat in Base64-Codierung bereitgestellt wird, muss es am Anfang durch -----BEGIN CERTIFICATE----- und am Ende durch -----END CERTIFICATE----- begrenzt werden. Wenn dieser Parameter angegeben wird, wird diese private CA-Zertifikatsdatei abgerufen und zum Vertrauensspeicher des Dataflow Workers hinzugefügt, um das SSL-Zertifikat des Splunk HEC-Endpunkts zu überprüfen. Wenn dieser Parameter nicht angegeben ist, wird der Standard-Vertrauensspeicher verwendet.
enableBatchLogs (Optional) Gibt an, ob Logs für Batches aktiviert werden sollen, die in Splunk geschrieben werden. Standardeinstellung: true.
enableGzipHttpCompression (Optional) Gibt an, ob HTTP-Anfragen, die an Splunk HEC gesendet werden, komprimiert werden sollen (gzip-Inhaltscodierung). Standardeinstellung: true.

Vorlage "Pub/Sub für Splunk" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Splunk template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
  • ROOT_CA_CERTIFICATE_PATH: Der Pfad zum Stamm-CA-Zertifikat in Cloud Storage (z. B. gs://your-bucket/privateCA.crt)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
  • ROOT_CA_CERTIFICATE_PATH: Der Pfad zum Stamm-CA-Zertifikat in Cloud Storage (z. B. gs://your-bucket/privateCA.crt)

Pub/Sub für Avro-Dateien in Cloud Storage

Die Vorlage „Pub/Sub für Avro-Dateien in Cloud Storage“ ist eine Streamingpipeline, die Daten aus einem Pub/Sub-Thema liest und Avro-Dateien in einen angegebenen Cloud Storage-Bucket schreibt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Eingabethema muss vor der Ausführung der Pipeline vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputTopic Cloud Pub/Sub-Thema, das zur Nachrichtenaufnahme abonniert werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Ausgabeverzeichnis, in dem die Avro-Dateien archiviert sind. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/
avroTempDirectory Verzeichnis für temporäre Avro-Dateien. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/.
outputFilenamePrefix (Optional) Präfix für den Ausgabedateinamen der Avro-Dateien.
outputFilenameSuffix (Optional) Suffix für den Ausgabedateinamen der Avro-Dateien.
outputShardTemplate [Optional) Shard-Vorlage der Ausgabedatei. Sie wird als sich wiederholende Folge der Buchstaben S oder N angegeben. Beispiel: SSS-NNN. Diese werden entweder durch die Shard-Nummer oder durch die Gesamtzahl der Shards ersetzt. Wenn dieser Parameter nicht angegeben ist, ist das Standardvorlagenformat W-P-SS-of-NN.

Vorlage "Pub/Sub für Cloud Storage Avro" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Avro Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage

Pub/Sub-Thema für Textdateien in Cloud Storage

Die Vorlage "Pub/Sub-Thema für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
  • Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. z. B. output-.
outputFilenameSuffix Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzigen Datei pro Fenster ausgegeben werden. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.

Vorlage "Pub/Sub-Thema für Textdateien in Cloud Storage" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Text Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

Pub/Sub-Thema oder -Abo für Textdateien in Cloud Storage

Die Vorlage "Pub/Sub-Thema oder -Abo für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Thema oder -Abo muss vor der Ausführung vorhanden sein.
  • Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
  • Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben. Wenn dieser Parameter angegeben wird, sollte inputSubscription nicht angegeben werden.
inputSubscription Das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll. Der Aboname muss das Format projects/<project-id>/subscription/<subscription-name> haben. Wenn dieser Parameter angegeben wird, sollte inputTopic nicht angegeben werden.
outputDirectory Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. z. B. output-.
outputFilenameSuffix Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzigen Datei pro Fenster ausgegeben werden. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.
windowDuration (Optional) Die Fensterdauer ist das Intervall, in dem Daten in das Ausgabeverzeichnis geschrieben werden. Konfigurieren Sie die Dauer anhand des Durchsatzes der Pipeline. Beispielsweise kann ein höherer Durchsatz kleinere Fenstergrößen erfordern, damit die Daten in den Speicher passen. Die Standardeinstellung ist „5m”, mit mindestens 1 s. Zulässige Formate sind: [int]s (für Sekunden, Beispiel: 5s), [int]m (für Minuten, Beispiel: 12m), [int]h (für Stunden, Beispiel: 2h).

Vorlage "Pub/Sub-Thema oder -Abo für Textdateien in Cloud Storage" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

Pub/Sub für MongoDB

Die Vorlage „Pub/Sub für MongoDB“ ist eine Streamingpipeline, die JSON-codierte Nachrichten aus einem Pub/Sub-Abo liest und in MongoDB als Dokumente schreibt. Bei Bedarf unterstützt diese Pipeline zusätzliche Transformationen, die über eine benutzerdefinierte JavaScript-Funktion (UDF) eingebunden werden können. Alle Fehler sind aufgrund von nicht übereinstimmenden Schemata, nicht korrekt formatiertem JSON oder während der Ausführung von Transformationen in einer BigQuery-Tabelle für nicht verarbeitete Nachrichten zusammen mit der Eingabenachricht aufgetreten. Falls noch keine Tabelle für nicht verarbeitete Datensätze vorhanden ist, wird diese Tabelle von der Pipeline automatisch erstellt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Der MongoDB-Cluster muss vorhanden und über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Name des Pub/Sub-Abos. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Durch Kommas getrennte Liste von MongoDB-Servern. Beispiel: 192.285.234.12:27017,192.287.123.11:27017
database Datenbank in MongoDB zum Speichern der Sammlung. Beispiel: my-db.
collection Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
deadletterTable BigQuery-Tabelle, die aus Fehlern resultierende Nachrichten speichert (nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Dateien usw.). Beispiel: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
batchSize (Optional) Batchgröße für die Aufnahme von Dokumentenbatches in MongoDB. Standardeinstellung: 1000.
batchSizeBytes (Optional) Batchgröße in Byte. Standardeinstellung: 5242880.
maxConnectionIdleTime (Optional) Maximale zulässige Leerlaufzeit in Sekunden, bis eine Zeitüberschreitung der Verbindung auftritt. Standardeinstellung: 60000.
sslEnabled (Optional) Boolescher Wert, der angibt, ob für die Verbindung zu MongoDB SSL aktiviert ist. Standardeinstellung: true.
ignoreSSLCertificate (Optional) Boolescher Wert, der angibt, ob das SSL-Zertifikat ignoriert werden soll. Standardeinstellung: true.
withOrdered (Optional) Boolescher Wert, mit dem geordnete Bulk-Aufnahmen in MongoDB aktiviert werden. Standardeinstellung: true.
withSSLInvalidHostNameAllowed (Optional) Boolescher Wert, der angibt, ob ein ungültiger Hostname für die SSL-Verbindung zulässig ist. Standardeinstellung: true.

Vorlage „Pub/Sub für MongoDB“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to MongoDB template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

Pub/Sub für Elasticsearch

Die Vorlage „Pub/Sub für Elasticsearch“ ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und sie als Dokumente in Elasticsearch schreibt. Die Dataflow-Vorlage verwendet die Datenstreams-Funktion von Elasticsearch, um Zeitachsendaten über mehrere Indexe zu speichern, wobei Sie eine einzige benannte Ressource für Anfragen erhalten. Datenstreams eignen sich gut für Logs, Messwerte, Traces und andere kontinuierlich generierte Daten, die in Pub/Sub gespeichert sind.

Voraussetzungen für diese Pipeline

  • Das Quell-Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Ein öffentlich erreichbarer Elasticsearch-Host auf einer GCP-Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Weitere Informationen finden Sie unter Google Cloud-Integration für Elastic.
  • Ein Pub/Sub-Thema für die Fehlerausgabe

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Cloud Pub/Sub-Abo, das verwendet werden soll. Der Name muss das Format projects/<project-id>/subscriptions/<subscription-name> haben.
connectionUrl Elasticsearch-URL im Format https://hostname:[port] oder geben Sie die CloudID an, wenn Elastic Cloud verwendet wird.
apiKey Base64-codierter API-Schlüssel für die Authentifizierung.
errorOutputTopic Pub/Sub-Ausgabe-Thema für die Veröffentlichung fehlgeschlagener Datensätze im Format projects/<project-id>/topics/<topic-name>
dataset (Optional) Der Typ von über Pub/Sub gesendete Logs, für die wir ein sofort einsatzfähiges Dashboard haben. Bekannte Werte für Logtypen sind "audit", "vpcflow" und "firewall". Standardeinstellung: pubsub.
namespace (Optional) Eine beliebige Gruppierung, z. B. eine Umgebung (dev, prod oder qa), ein Team oder eine strategische Geschäftseinheit. Standardeinstellung: default.
batchSize (Optional) Batchgröße in der Anzahl an Dokumenten. Standardeinstellung: 1000.
batchSizeBytes (Optional) Batchgröße in der Anzahl an Byte. Standardeinstellung: 5242880 (5 MB).
maxRetryAttempts (Optional) Maximale Wiederholungsversuche, muss > 0 sein. Standardeinstellung: no retries.
maxRetryDuration (Optional) Maximale Wiederholungsdauer in Millisekunden, muss > 0 sein. Standardeinstellung: no retries.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
propertyAsIndex (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_index-UDF). Standardwert: none.
propertyAsId (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_id-UDF). Standardwert: none.
javaScriptIndexFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIndexFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIsDeleteFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert "true" oder "false" zurückgeben. Standardwert: none.
javaScriptIsDeleteFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert "true" oder "false" zurückgeben. Standardwert: none.
usePartialUpdate (Optional) Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Standardeinstellung: false.
bulkInsertMethod (Optional) Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Standardeinstellung: CREATE.

Vorlage „Pub/Sub für Elasticsearch“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Elasticsearch template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

Datastream zu Cloud Spanner

Die Vorlage "Datastream zu Cloud Spanner" ist eine Streamingpipeline, die Datastream-Ereignisse aus einem Cloud Storage-Bucket liest und in eine Cloud Spanner-Datenbank schreibt. Sie ist für die Datenmigration von Datastream-Quellen zu Cloud Spanner vorgesehen.

Alle für die Migration erforderlichen Tabellen müssen vor der Ausführung der Vorlage in der Cloud Spanner-Zieldatenbank vorhanden sein. Daher muss die Schemamigration von einer Quelldatenbank zum Cloud Spanner-Ziel abgeschlossen sein, bevor Sie Daten migrieren können. Daten können in den Tabellen vor der Migration vorhanden sein. Diese Vorlage leitet keine Änderungen des Datastream-Schemas an die Cloud Spanner-Datenbank weiter.

Die Datenkonsistenz wird erst am Ende der Migration garantiert, wenn alle Daten in Cloud Spanner geschrieben wurden. Zum Speichern von Reihenfolgeinformationen für jeden in Cloud Spanner geschriebenen Datensatz erstellt diese Vorlage eine zusätzliche Tabelle (sogenannte Schattentabelle) für jede Tabelle in der Cloud Spanner-Datenbank. Dadurch wird die Konsistenz am Ende der Migration sichergestellt. Die Schattentabellen werden nach der Migration nicht gelöscht und können am Ende der Migration zur Validierung verwendet werden.

Alle Fehler, die während des Vorgangs auftreten, z. B. nicht übereinstimmende Schemas, fehlerhafte JSON-Dateien oder Fehler, die sich aus der Ausführung von Transformationen ergeben, werden in einer Fehlerwarteschlange aufgezeichnet. Die Fehlerwarteschlange ist ein Cloud Storage-Ordner, in dem alle Datastream-Ereignisse gespeichert werden, bei denen Fehler aufgetreten sind, zusammen mit der Fehlerursache im Textformat. Die Fehler können vorübergehend oder dauerhaft sein und in den entsprechenden Cloud Storage-Ordnern in der Fehlerwarteschlange gespeichert werden. Bei diesen vorübergehenden Fehler erfolgt automatisch eine Wiederholung, bei dauerhaften Fehlern dagegen nicht. Bei dauerhaften Fehlern haben Sie die Möglichkeit, Korrekturen an den Änderungsereignissen vorzunehmen und diese in den Bucket für Wiederholungen zu verschieben, während die Vorlage ausgeführt wird.

Voraussetzungen für diese Pipeline:

  • Ein Datastream-Stream mit dem Status Wird ausgeführt oder Nicht gestartet.
  • Ein Cloud Storage-Bucket, in dem Datastream-Ereignisse repliziert werden.
  • Eine Cloud Spanner-Datenbank mit vorhandenen Tabellen. Diese Tabellen können leer sein oder Daten enthalten.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. In der Regel ist dies der Stammpfad für einen Stream.
streamName Der Name oder die Vorlage für den Stream, der für Schemainformationen und den Quelltyp abgefragt werden soll.
instanceId Die Cloud Spanner-Instanz, in der die Änderungen repliziert werden.
databaseId Die Cloud Spanner-Datenbank, in der die Änderungen repliziert werden.
projectId Die Cloud Spanner-Projekt-ID.
deadLetterQueueDirectory (Optional) Dies ist der Dateipfad zum Speichern der Fehlerwarteschlangenausgabe. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs.
inputFileFormat Optional: Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
shadowTablePrefix (Optional) Das Präfix zum Benennen von Schattentabellen. Standardeinstellung: shadow_.

Vorlage "Datastream zu Cloud Spanner" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to Spanner template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE ist Ihre Cloud Spanner-Instanz.
  • CLOUDSPANNER_DATABASE ist Ihre Cloud Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE ist Ihre Cloud Spanner-Instanz.
  • CLOUDSPANNER_DATABASE ist Ihre Cloud Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

Textdateien in Cloud Storage für BigQuery (Stream)

Die Pipeline „Textdateien in Cloud Storage für BigQuery“ ist eine Streamingpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien streamen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery anhängen können.

Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss manuell über eine Cancel-Anweisung und kein Drain beendet werden, aufgrund ihrer Verwendung der Watch Transformation, die eine splittable DoFn ist, die den Draining nicht unterstützt.

Voraussetzungen für diese Pipeline:

  • Erstellen Sie eine JSON-Datei, die das Schema Ihrer Ausgabetabelle in BigQuery beschreibt.

    Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen fields bereit, dessen Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt. Beispiel:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Erstellen Sie eine JavaScript-Datei (.js) mit Ihrer UDF, die die Logik für die Transformation der Textzeilen bereitstellt. Beachten Sie, dass Ihre Funktion einen JSON-String zurückgeben muss.

    Diese Funktion teilt beispielsweise jede Zeile einer CSV-Datei auf und gibt nach der Transformation der Werte einen JSON-String zurück.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Vorlagenparameter

Parameter Beschreibung
javascriptTextTransformGcsPath Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
JSONPath Der Cloud Storage-Speicherort Ihrer BigQuery-Schemadatei im JSON-Format. Beispiel: gs://path/to/my/schema.json.
javascriptTextTransformFunctionName ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
outputTable Die vollständig qualifizierte BigQuery-Tabelle. Beispiel: my-project:dataset.table.
inputFilePattern Der Cloud Storage-Speicherort des Textes, den Sie verarbeiten möchten. Beispiel: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory Das temporäre Verzeichnis für den BigQuery-Ladevorgang. Beispiel: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Beispiel: my-project:dataset.my-unprocessed-table. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Vorlage "Cloud Storage Text für BigQuery (Stream)" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis

Textdateien in Cloud Storage für Pub/Sub (Stream)

Diese Vorlage erstellt eine Streamingpipeline, die kontinuierlich nach neuen Textdateien sucht, die in Cloud Storage hochgeladen wurden, jede Datei Zeile für Zeile liest und Strings in einem Pub/Sub-Thema veröffentlicht. Die Vorlage veröffentlicht Datensätze aus einer JSON-Datei mit Zeilenumbruch oder einer CSV-Datei zur Echtzeitverarbeitung in einem Pub/Sub-Thema. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.

Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss über eine „Cancel”-Anweisung und nicht durch einen „Drain” beendet werden. Die Verwendung der „Watch”-Transformation, bei der es sich um ein SplittableDoFn handelt, die kein Draining unterstützt.

Derzeit ist das Abfrageintervall fest auf zehn Sekunden festgelegt. Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest, sodass die Ereigniszeit der Veröffentlichungszeit während der Ausführung entspricht. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, sollten Sie diese Pipeline nicht verwenden.

Voraussetzungen für diese Pipeline:

  • Die Eingabedateien müssen im JSON-Format mit Zeilenumbruch oder im CSV-Format vorliegen. Datensätze, die sich über mehrere Zeilen in den Quelldateien erstrecken, können zu Problemen in den nachgelagerten Prozessen führen, da jede Zeile in den Dateien als eine Nachricht an Pub/Sub veröffentlicht wird.
  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die Pipeline wird unbefristet ausgeführt und muss manuell beendet werden.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel: gs://bucket-name/files/*.json oder gs://bucket-name/path/*.csv.
outputTopic Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format projects/<project-id>/topics/<topic-name> haben.

Vorlage „Textdateien in Cloud Storage für Pub/Sub (Stream)” ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to Pub/Sub (Stream) template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)

Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)

Die Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ist eine Streamingpipeline, die CSV-Dateien aus einem Cloud Storage-Bucket liest und die Cloud Data Loss Prevention API (Cloud DLP) für die De- Identifikation aufruft und die de-identifizierten Daten in die angegebene BigQuery-Tabelle schreibt. Diese Vorlage unterstützt sowohl die Verwendung einer Inspektionsvorlage als auch einer De-Identifikationsvorlage für Cloud DLP. Auf diese Weise können Nutzer nach potenziell vertraulichen Informationen suchen und deren Identifizierung aufheben sowie die Identifizierung von strukturierten Daten aufheben, in denen Spalten für die De-Identifikation angegeben sind und kein Prüfung erforderlich ist. Beachten Sie auch, dass diese Vorlage keinen regionalen Pfad für den Speicherort der De-Identifikationsvorlage unterstützt. Es wird nur ein globaler Pfad unterstützt.

Voraussetzungen für diese Pipeline:

  • Die Eingabedaten für die Tokenisierung müssen vorhanden sein.
  • Die Cloud DLP-Vorlagen müssen vorhanden sein, zum Beispiel "DeidentifyTemplate" und "InspectTemplate". Weitere Informationen finden Sie unter Cloud DLP-Vorlagen.
  • Das BigQuery-Dataset muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Die CSV-Dateien, aus denen Eingabedatensätze gelesen werden sollen. Platzhalter werden ebenfalls akzeptiert. Beispiel: gs://mybucket/my_csv_filename.csv oder gs://mybucket/file-*.csv.
dlpProjectId ID des Cloud DLP-Projekts, das Inhaber der Cloud DLP API-Ressource ist. Dieses Cloud DLP-Projekt kann dasselbe Projekt sein, das auch Inhaber der Cloud DLP-Vorlagen ist, oder ein separates Projekt. Beispiel: my_dlp_api_project.
deidentifyTemplateName De-Identifikationsvorlage für Cloud DLP nach dem Muster projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} zur Verwendung für API-Anfragen. Beispiel: projects/my_project/deidentifyTemplates/100.
datasetName BigQuery-Dataset zum Senden von tokenisierten Ergebnissen.
batchSize Block-/Batchgröße zum Senden von Daten zur Prüfung und/oder Aufhebung der Tokenisierung. Im Fall einer CSV-Datei gibt batchSize die Anzahl der Zeilen in einem Batch an. Nutzer müssen die Batchgröße anhand der Größe der Datensätze und der Größe der Datei bestimmen. Für die Cloud DLP API gilt eine maximale Nutzlastgröße von 524 KB pro API-Aufruf.
inspectTemplateName (Optional) Cloud DLP-Inspektionsvorlage nach dem Muster projects/{template_project_id}/identifyTemplates/{idTemplateId} zur Verwendung für API-Anfragen. Beispiel: projects/my_project/identifyTemplates/100.

Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

Dabei gilt:

  • DLP_API_PROJECT_ID: Ihre Cloud DLP API-Projekt-ID
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_DATA ist der Pfad zur Eingabedatei
  • DEIDENTIFY_TEMPLATE ist die Nummer der Cloud DLP-De-Identifikationsvorlage
  • DATASET_NAME: der Name des BigQuery-Datasets
  • INSPECT_TEMPLATE_NUMBER ist die Nummer der Cloud DLP-Prüfungsvorlage
  • BATCH_SIZE_VALUE ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • DLP_API_PROJECT_ID: Ihre Cloud DLP API-Projekt-ID
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_DATA ist der Pfad zur Eingabedatei
  • DEIDENTIFY_TEMPLATE ist die Nummer der Cloud DLP-De-Identifikationsvorlage
  • DATASET_NAME: der Name des BigQuery-Datasets
  • INSPECT_TEMPLATE_NUMBER ist die Nummer der Cloud DLP-Prüfungsvorlage
  • BATCH_SIZE_VALUE ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)

Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)

Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.

Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.

Voraussetzungen für diese Pipeline:

  • Der Debezium-Connector muss bereitgestellt sein.
  • Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscriptions Die durch Kommas getrennte Liste mit Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format <subscription>,<subscription>, ...
changeLogDataset Das BigQuery-Dataset zum Speichern der Staging-Tabellen im Format <my-dataset>.
replicaDataset Der Speicherort des BigQuery-Datasets zum Speichern der Replikattabellen im Format <my-dataset>.
updateFrequencySecs (Optional) Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.

Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)" ausführen

Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:

  1. Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
  2. Wechseln Sie zum Verzeichnis v2/cdc-parent.
  3. Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
  4. Führen Sie mit Maven die Dataflow-Vorlage aus.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Dabei gilt:

    • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
    • SUBSCRIPTIONS: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.
    • CHANGELOG_DATASET: Ihr BigQuery-Dataset für Änderungslogdaten.
    • REPLICA_DATASET: Ihr BigQuery-Dataset für Replikattabellen.

Apache Kafka für BigQuery

Die Vorlage „Apache Kafka für BigQuery“ ist eine Streamingpipeline, die Textdaten aus Apache Kafka schreibt, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und die resultierenden Datensätze in BigQuery ausgibt. Alle Fehler, die bei der Transformation der Daten, der Ausführung der UDF oder beim Schreiben in die Ausgabetabelle auftreten, werden in eine separate Fehlertabelle in BigQuery geschrieben. Wenn die Fehlertabelle vor der Ausführung nicht vorhanden ist, wird sie erstellt.

Voraussetzungen für diese Pipeline

  • Die BigQuery-Ausgabetabelle muss vorhanden sein.
  • Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
  • Die Apache Kafka-Themen müssen vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.

Vorlagenparameter

Parameter Beschreibung
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle, in den die Apache Kafka-Nachrichten geschrieben werden sollen, im Format my-project:dataset.table.
inputTopics Die Apache Kafka-Eingabethemen, aus denen eine durch Kommas getrennte Liste gelesen werden soll. Beispiel: messages
bootstrapServers Die Hostadresse der ausgeführten Apache Kafka-Broker-Server in einer durch Kommas getrennten Liste, jede Hostadresse im Format 35.70.252.199:9092.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
outputDeadletterTable (Optional) Die BigQuery-Tabelle im Format my-project:dataset.my-deadletter-table für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird die Tabelle während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Vorlage „Apache Kafka für BigQuery“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Kafka to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.

Weitere Informationen finden Sie unter Mit Dataflow Daten von Kafka in BigQuery schreiben.

Datastream zu BigQuery (Stream)

Die Vorlage "Datastream zu BigQuery" ist eine Streamingpipeline, die Datastream-Daten liest und in BigQuery repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert sie in eine nach der Zeit partitionierte BigQuery-Staging-Tabelle. Nach der Replikation führt die Vorlage einen MERGE-Vorgang in BigQuery aus, um alle CDC-Änderungen (Change Data Capture) in ein Replikat der Quelltabelle einzufügen bzw. dort zu aktualisieren.

Die Vorlage verarbeitet das Erstellen und Aktualisieren der BigQuery-Tabellen, die von der Replikation verwaltet werden. Wenn eine Datendefinitionssprache (DDL) erforderlich ist, extrahiert ein Callback an Datastream das Quelltabellenschema und übersetzt es in BigQuery-Datentypen. Unterstützte Vorgänge umfassen Folgendes:

  • Neue Tabellen werden beim Einfügen von Daten erstellt.
  • Den BigQuery-Tabellen werden neue Spalten mit Null-Anfangswerten hinzugefügt.
  • Verworfene Spalten werden in BigQuery ignoriert und zukünftige Werte sind null.
  • Umbenannte Spalten werden BigQuery als neue Spalten hinzugefügt.
  • Typänderungen werden nicht an BigQuery weitergegeben.

Voraussetzungen für diese Pipeline:

  • Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
  • Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
  • BigQuery-Ziel-Datasets werden erstellt und dem Compute Engine-Dienstkonto wurde Administratorzugriff darauf gewährt.
  • In der Quelltabelle ist ein Primärschlüssel erforderlich, damit die Ziel-Replikattabelle erstellt werden kann.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
gcsPubSubSubscription Das Pub/Sub-Abo mit Datastream-Dateibenachrichtigungen, z. B. projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
outputStagingDatasetTemplate Der Name eines vorhandenen Datasets, das Staging-Tabellen enthält. Sie können die Vorlage {_metadata_dataset} als Platzhalter einfügen, der durch den Namen Ihres Quell-Datasets/-Schemas ersetzt wird (z. B. {_metadata_dataset}_log).
outputDatasetTemplate Der Name eines vorhandenen Datasets, das Replikattabellen enthält. Sie können die Vorlage {_metadata_dataset} als Platzhalter einfügen, der durch den Namen Ihres Quell-Datasets/-Schemas ersetzt wird (z. B. {_metadata_dataset}).
deadLetterQueueDirectory Der Dateipfad zum Speichern nicht verarbeiteter Nachrichten mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
outputStagingTableNameTemplate (Optional) Die Vorlage für den Namen der Staging-Tabellen. Der Standardwert ist {_metadata_table}_log. Wenn Sie mehrere Schemas replizieren, wird {_metadata_schema}_{_metadata_table}_log empfohlen.
outputTableNameTemplate (Optional) Die Vorlage für den Namen der Replikattabellen. Standardeinstellung: {_metadata_table}. Wenn Sie mehrere Schemas replizieren, wird {_metadata_schema}_{_metadata_table} empfohlen.
outputProjectId (Optional) Projekt für BigQuery-Datasets, in das Daten ausgegeben werden sollen. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
streamName (Optional) Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Standardeinstellung: {_metadata_stream}.
mergeFrequencyMinutes (Optional) Die Anzahl der Minuten zwischen den Zusammenführungen für eine bestimmte Tabelle. Standardeinstellung: 5.
dlqRetryMinutes (Optional) Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Standardeinstellung: 10.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Vorlage "Datastream zu BigQuery" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Datastream to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET ist der Name Ihres BigQuery-Datasets.
  • BIGQUERY_TABLE ist Ihre BigQuery-Tabellenvorlage, z. B. {_metadata_schema}_{_metadata_table}_log.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET ist der Name Ihres BigQuery-Datasets.
  • BIGQUERY_TABLE ist Ihre BigQuery-Tabellenvorlage, z. B. {_metadata_schema}_{_metadata_table}_log.

Datastream zu MySQL oder PostgreSQL (Stream)

Die Vorlage "Datastream to SQL" ist eine Streaming-Pipeline, die Datastream-Daten liest und in jede MySQL- oder PostgreSQL-Datenbank repliziert. Die Vorlage liest Daten aus Cloud Storage mithilfe von Pub/Sub-Benachrichtigungen und repliziert diese Daten in SQL-Replikattabellen.

Die Vorlage unterstützt die Datendefinitionssprache (DDL) nicht und erwartet, dass alle Tabellen bereits in der Datenbank vorhanden sind. Die Replikation verwendet zustandsorientierte Transformationen in Dataflow, um veraltete Daten zu filtern und für die Konsistenz von Daten zu sorgen. Wenn beispielsweise eine neuere Version einer Zeile bereits verarbeitet wurde, wird eine später ankommende Version dieser Zeile ignoriert. Die ausgeführte Datenbearbeitungssprache (DML) versucht, die Ziel- oder Quelldaten so gut wie möglich zu replizieren. Für die ausgeführten DML-Anweisungen gelten die folgenden Regeln:

  • Wenn ein Primärschlüssel vorhanden ist, verwenden Einfügungs- und Aktualisierungsvorgänge eine Upsert-Syntax (d. h. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Wenn Primärschlüssel vorhanden sind, werden Löschvorgänge als Lösch-DML repliziert.
  • Wenn kein Primärschlüssel vorhanden ist, werden sowohl Einfüge- als auch Aktualisierungsvorgänge in die Tabelle eingefügt.
  • Wenn keine Primärschlüssel vorhanden sind, werden Löschvorgänge ignoriert.

Wenn Sie die Oracle-zu-Postgres-Dienstprogramme verwenden, fügen Sie ROWID in SQL als Primärschlüssel hinzu, wenn keine vorhanden sind.

Voraussetzungen für diese Pipeline:

  • Ein Datastream-Stream, der bereits Daten repliziert oder dafür bereit ist.
  • Cloud Storage Pub/Sub-Benachrichtigungen sind für die Datastream-Daten aktiviert.
  • Eine PostgreSQL-Datenbank wurde mit dem erforderlichen Schema konfiguriert.
  • Der Netzwerkzugriff zwischen Dataflow-Workern und PostgreSQL ist eingerichtet.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. Dieser Dateispeicherort ist normalerweise der Stammpfad für den Stream.
gcsPubSubSubscription Das Pub/Sub-Abo mit Datastream-Dateibenachrichtigungen, z. B. projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
databaseHost Der SQL-Host, auf dem eine Verbindung hergestellt werden soll.
databaseUser Der SQL-Nutzer mit allen erforderlichen Berechtigungen zum Schreiben in alle Tabellen in der Replikation.
databasePassword Das Passwort für den jeweiligen SQL-Nutzer.
databasePort (Optional) Der SQL-Datenbankport, zu dem eine Verbindung hergestellt werden soll. Standardeinstellung: 5432.
databaseName (Optional) Der Name der SQL-Datenbank, zu der eine Verbindung hergestellt werden soll. Standardeinstellung: "postgres".
streamName (Optional) Der Name oder die Vorlage für den Stream, der nach Schemainformationen abgefragt wird. Standardeinstellung: {_metadata_stream}.

Vorlage "Datastream zu SQL" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to SQL template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH ist der Cloud Storage-Pfad zu Datastream-Daten. Beispiel: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME ist das Pub/Sub-Abo, aus dem geänderte Dateien gelesen werden sollen. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST: Ihre SQL-Host-IP-Adresse
  • DATABASE_USER: Ihr SQL-Nutzer
  • DATABASE_PASSWORD: Ihr SQL-Passwort

Pub/Sub für Java Database Connectivity (JDBC)

Die Vorlage „Pub/Sub für Java Database Connectivity (JDBC)“ ist eine Streamingpipeline, die Daten aus einem bereits vorhandenen Cloud Pub/Sub-Abo als JSON-Strings aufnimmt und die resultierenden Datensätze in JDBC schreibt.

Voraussetzungen für diese Pipeline:

  • Das als Cloud Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die JDBC-Quelle muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Das Cloud Pub/Sub-Output-Thema für unzustellbare Nachrichten muss vor der Pipelineausführung vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
driverClassName Der Name der JDBC-Treiberklasse. z. B. com.mysql.jdbc.Driver.
connectionUrl Der URL-String für die JDBC-Verbindung. Beispiel: jdbc:mysql://some-host:3306/sampledb. Kann als String übergeben werden, der Base64-codiert und dann mit einem Cloud KMS-Schlüssel verschlüsselt wird.
driverJars Durch Kommas getrennte Cloud Storage-Pfade für JDBC-Treiber. z. B. gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username (Optional) Der Nutzername, der für die JDBC-Verbindung verwendet werden soll. Kann als Base64-codierter String übergeben werden, der mit einem Cloud KMS-Schlüssel verschlüsselt ist.
password (Optional) Das Passwort für die JDBC-Verbindung. Kann als Base64-codierter String übergeben werden, der mit einem Cloud KMS-Schlüssel verschlüsselt ist.
connectionProperties [Optional] Attributstring für die JDBC-Verbindung. Format des Strings muss [propertyName=property;]* sein. Beispiel: unicode=true;characterEncoding=UTF-8.
statement Für die Datenbank auszuführende Anweisung. Die Anweisung muss die Spaltennamen der Tabelle in beliebiger Reihenfolge angeben. Nur die Werte der angegebenen Spaltennamen werden aus der JSON-Datei gelesen und zur Anweisung hinzugefügt. Beispiel: INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. z. B. projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey [Optional] Cloud KMS-Verschlüsselungsschlüssel zur Entschlüsselung des Nutzernamens, Passworts und Verbindungsstrings. Wenn der Cloud KMS-Schlüssel übergeben wird, müssen der Nutzername, das Passwort und der Verbindungsstring verschlüsselt übergeben werden.
extraFilesToStage Durch Kommas getrennte Cloud Storage-Pfade oder Secret Manager-Secrets für Dateien, die im Worker bereitgestellt werden sollen. Diese Dateien werden im Verzeichnis /extra_files in jedem Worker gespeichert. Beispiel: gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Vorlage „Pub/Sub to Java Database Connectivity (JDBC)“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to JDBC template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • DRIVER_CLASS_NAME: Der Name der Treiberklasse
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • DRIVER_PATHS: Die kommagetrennten Cloud Storage-Pfade der JDBC-Treiber
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • CONNECTION_PROPERTIES: JDBC-Verbindungsattribute, falls erforderlich
  • SQL_STATEMENT: Die SQL-Anweisung, die für die Datenbank ausgeführt werden soll.
  • INPUT_SUBSCRIPTION: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll.
  • OUTPUT_DEADLETTER_TOPIC: Das Pub/Sub als Weiterleitungsziel für nicht zustellbare Nachrichten
  • KMS_ENCRYPTION_KEY: Der Cloud KMS-Verschlüsselungsschlüssel

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • DRIVER_CLASS_NAME: Der Name der Treiberklasse
  • JDBC_CONNECTION_URL: Die JDBC-Verbindungs-URL
  • DRIVER_PATHS: Die kommagetrennten Cloud Storage-Pfade der JDBC-Treiber
  • CONNECTION_USERNAME: Der Nutzername der JDBC-Verbindung
  • CONNECTION_PASSWORD: Das JDBC-Verbindungspasswort
  • CONNECTION_PROPERTIES: JDBC-Verbindungsattribute, falls erforderlich
  • SQL_STATEMENT: Die SQL-Anweisung, die für die Datenbank ausgeführt werden soll.
  • INPUT_SUBSCRIPTION: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll.
  • OUTPUT_DEADLETTER_TOPIC: Das Pub/Sub als Weiterleitungsziel für nicht zustellbare Nachrichten
  • KMS_ENCRYPTION_KEY: Der Cloud KMS-Verschlüsselungsschlüssel

Cloud Spanner-Änderungsstreams für Cloud Storage

Die Vorlage „Cloud Spanner-Änderungsstreams für Cloud Storage“ ist eine Streamingpipeline, die Spanner-Datenänderungsdatensätze streamt und sie mit Dataflow Runner V2 in einen Cloud Storage-Bucket schreibt.

Die Pipeline gruppiert Spanner-Streamdatensätze anhand ihres Zeitstempels, wobei jedes Fenster eine Zeitdauer darstellt, deren Länge Sie mit dieser Vorlage konfigurieren können. Alle Datensätze mit Zeitstempeln, die zum Fenster gehören, befinden sich auch wirklich im Fenster. Es können keine verspäteten Ansagen vorhanden sein. Sie können auch mehrere Ausgabe-Shards definieren. Die Pipeline erstellt eine Cloud Storage-Ausgabedatei pro Fenster und Shard. Innerhalb einer Ausgabedatei sind die Datensätze ungeordnet. Ausgabedateien können je nach Nutzerkonfiguration im JSON- oder AVRO-Format geschrieben werden.

Beachten Sie, dass Sie die Netzwerklatenz und die Netzwerktransportkosten minimieren können. Führen Sie dazu den Dataflow-Job in derselben Region aus, in der sich auch Ihre Cloud Spanner-Instanz oder Ihr Cloud Storage-Bucket befindet. Wenn Sie Quellen und Senken sowie Speicherorte für Staging-Dateien und temporäre Dateien verwenden, die sich außerhalb der Region Ihres Jobs befinden, werden Ihre Daten möglicherweise regionenübergreifend gesendet. Weitere Informationen finden Sie unter Regionale Dataflow-Endpunkte.

Weitere Informationen zu Änderungsstreams, zum Erstellen von Dataflow-Pipelines für Änderungsstreams und Best Practices

Voraussetzungen für diese Pipeline:

  • Die Cloud Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Parameter Beschreibung
spannerInstanceId Die Cloud Spanner-Instanz-ID, aus der Änderungsstreamdaten gelesen werden.
spannerDatabase Die Cloud Spanner-Datenbank, aus der Änderungsstreamdaten gelesen werden sollen.
spannerMetadataInstanceId Die Cloud Spanner-Instanz-ID, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerMetadataDatabase Die Cloud Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerChangeStreamName Der Name des Cloud Spanner-Änderungsstreams, aus dem gelesen werden soll.
gcsOutputDirectory Der Speicherort der Datei für Änderungsstreams in Cloud Storage im Format „gs://${BUCKET}/${ROOT_PATH}/“.
outputFilenamePrefix (Optional) Das Dateinamenpräfix der Dateien, in die geschrieben werden soll. Das Standarddateipräfix ist „output”.
spannerProjectId (Optional) Das Projekt, aus dem Änderungsstreams gelesen werden. Dies ist auch das Projekt, in dem die Metadatentabelle des Änderungsstream-Connectors erstellt wird. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
startTimestamp (Optional) Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
endTimestamp (Optional) Die End-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
outputFileFormat (Optional) Das Format der Cloud Storage-Ausgabedatei. Zulässige Formate sind TEXT, AVRO. Der Standardwert ist AVRO.
windowDuration (Optional) Die Fensterdauer ist das Intervall, in dem Daten in das Ausgabeverzeichnis geschrieben werden. Konfigurieren Sie die Dauer anhand des Durchsatzes der Pipeline. Beispielsweise kann ein höherer Durchsatz kleinere Fenstergrößen erfordern, damit die Daten in den Speicher passen. Die Standardeinstellung ist „5m”, mit mindestens 1 s. Zulässige Formate sind: [int]s (für Sekunden, Beispiel: 5s), [int]m (für Minuten, Beispiel: 12m), [int]h (für Stunden, Beispiel: 2h).
rpcPriority (Optional) Die Anfragepriorität für Cloud Spanner-Aufrufe. Der Wert muss einer der folgenden sein:[HIGH,MEDIUM,LOW]. (Standardeinstellung: HIGH)
numShards (Optional) Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Der Standardwert ist 20. Eine höhere Anzahl von Shards erhöht den Durchsatz für das Schreiben in Cloud Storage, aber möglicherweise auch höhere Kosten für die Datenaggregation über Shards bei der Verarbeitung von Cloud Storage-Ausgabedateien.
spannerMetadataTableName (Optional) Der Name der zu verwendenden Connector-Metadatentabelle für Cloud Spanner-Änderungsstreams. Wenn nicht angegeben, wird während des Pipelineablaufs automatisch eine Metadatentabelle für Cloud Spanner-Änderungsstreams erstellt. Dieser Parameter muss beim Aktualisieren einer vorhandenen Pipeline angegeben werden und sollte nicht anderweitig angegeben werden.

Vorlage „Cloud Spanner-Änderungsstreams für Cloud Storage“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to Google Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • GCS_OUTPUT_DIRECTORY: Dateispeicherort für die Ausgabe der Änderungsstreams

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • GCS_OUTPUT_DIRECTORY: Dateispeicherort für die Ausgabe der Änderungsstreams

Cloud Spanner-Änderungsstreams für BigQuery

Die Vorlage „Cloud Spanner-Änderungsstreams für BigQuery“ ist eine Streamingpipeline, die Datenänderungsdatensätze von Cloud Spanner streamt und sie mithilfe von Dataflow Runner V2 in BigQuery-Tabellen schreibt.

Wenn die erforderlichen BigQuery-Tabellen nicht vorhanden sind, werden sie von der Pipeline erstellt. Andernfalls werden vorhandene BigQuery-Tabellen verwendet. Das Schema vorhandener BigQuery-Tabellen muss die entsprechenden nachverfolgten Spalten der Cloud Spanner-Tabellen und die zusätzlichen Metadatenspalten enthalten (siehe Beschreibung der Metadatenfelder in der folgenden Liste), die nicht explizit von der Option „ignoreFields” ignoriert werden. Jede neue BigQuery-Zeile enthält alle Spalten, die vom Änderungsstream aus der entsprechenden Zeile in Ihrer Cloud Spanner-Tabelle zum Zeitstempel des Änderungseintrags beobachtet werden.

Alle Spalten zum Beobachten von Änderungsstreams sind in jeder BigQuery-Tabellenzeile enthalten, unabhängig davon, ob sie durch eine Cloud Spanner-Transaktion geändert werden. Nicht beobachtete Spalten sind nicht in der BigQuery-Zeile enthalten. Alle Cloud Spanner-Änderungen, die kleiner als das Dataflow-Wasserzeichen sind, werden entweder erfolgreich auf die BigQuery-Tabellen angewendet oder in der Dead-Letter-Warteschlange zur Wiederholung gespeichert. BigQuery-Zeilen werden im Vergleich zur ursprünglichen Reihenfolge des Cloud Commit-Zeitstempels in der falschen Reihenfolge eingefügt.

Die folgenden Metadatenfelder werden zu BigQuery-Tabellen hinzugefügt:

  • _metadata_spanner_mod_type: Extrahiert aus dem Änderungsstream-Datensatz.
  • _metadata_spanner_table_name: Der Cloud Spanner-Tabellenname. Beachten Sie, dass dies nicht der Name der Metadatentabelle des Connectors ist.
  • _metadata_spanner_commit_timestamp: Extrahiert aus dem Änderungsstream-Datensatz.
  • _metadata_spanner_server_transaction_id: Aus dem Änderungsstream des Änderungsdatensatzes extrahiert.
  • _metadata_spanner_record_sequence: Extrahiert aus dem Änderungsstream-Datensatz.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: Extrahiert aus dem Änderungsstream-Datensatz.
  • _metadata_spanner_number_of_records_in_transaction: Aus dem Änderungsdatensatz für die Änderungsstreams extrahiert.
  • _metadata_spanner_number_of_partitions_in_transaction: Aus dem Änderungsstream-Datensatz extrahiert.
  • _metadata_big_query_commit_timestamp: Der Commit-Zeitstempel, wann die Zeile in BigQuery eingefügt wurde.

Hinweis:

  • Diese Vorlage überträgt keine Schemaänderungen von Cloud Spanner an BigQuery. Da die Schemaänderung in Cloud Spanner wahrscheinlich zu einer Unterbrechung der Pipeline führt, müssen Sie die Pipeline nach der Schemaänderung möglicherweise neu erstellen.
  • Bei den Werterfassungstypen OLD_AND_NEW_VALUES und NEW_VALUES muss die Vorlage, wenn der Datensatz eine UPDATE-Änderung enthält, einen veralteten Lesevorgang in Cloud Spanner zum Commit-Zeitstempel des Datensatzes durchführen, um die unveränderten, aber überwachten Spalten abzurufen. Prüfen Sie, ob Sie die „version_retention_period“ in Ihrer Datenbank richtig konfiguriert haben, um veraltete Daten lesen zu können. Für den Werterfassungstyp NEW_ROW ist die Vorlage effizienter, da der Datensatz für die Datenänderung die gesamte neue Zeile erfasst, einschließlich der Spalten, die bei UPDATEs nicht aktualisiert werden, und die Vorlage keinen veralteten Lesevorgang durchführen muss.
  • Sie können die Netzwerklatenz und die Netzwerktransportkosten minimieren. Dazu führen Sie den Dataflow-Job in derselben Region aus, in der sich auch Ihre Cloud Spanner-Instanz oder BigQuery-Tabellen befinden. Wenn Sie Quellen und Senken sowie Speicherorte für Staging-Dateien und temporäre Dateien verwenden, die sich außerhalb der Region Ihres Jobs befinden, werden Ihre Daten möglicherweise regionenübergreifend gesendet. Weitere Informationen finden Sie unter Regionale Dataflow-Endpunkte.
  • Diese Vorlage unterstützt alle gültigen Cloud Spanner-Datentypen. Wenn der BigQuery-Typ jedoch genauer ist als der Cloud Spanner-Typ, kann während der Transformation ein Genauigkeitsverlust auftreten. Insbesondere:
    • Im Fall von JSON-Typen in Cloud Spanner wird die Reihenfolge der Mitglieder eines Objekts lexikografisch angeordnet. Es gibt jedoch keine Garantie dafür.
    • Cloud Spanner unterstützt den TIMESTAMP-Typ „Nanosekunden”, BigQuery unterstützt nur den TIMESTAMP-Typ „Mikrosekunden”.

Weitere Informationen zu Änderungsstreams, zum Erstellen von Dataflow-Pipelines für Änderungsstreams und Best Practices

Voraussetzungen für diese Pipeline:

  • Die Cloud Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das BigQuery-Dataset muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Parameter Beschreibung
spannerInstanceId Die Cloud Spanner-Instanz, aus der Änderungsstreams gelesen werden sollen.
spannerDatabase Die Cloud Spanner-Datenbank, aus der Änderungsstreams gelesen werden sollen.
spannerMetadataInstanceId Die Cloud Spanner-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerMetadataDatabase Die Cloud Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerChangeStreamName Der Name des Cloud Spanner-Änderungsstreams, aus dem gelesen werden soll.
bigQueryDataSet Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.
spannerProjectId (Optional) Das Projekt, aus dem Änderungsstreams gelesen werden. Dies ist auch das Projekt, in dem die Metadatentabelle des Änderungsstream-Connectors erstellt wird. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
spannerMetadataTableName (Optional) Der Name der zu verwendenden Connector-Metadatentabelle für Cloud Spanner-Änderungsstreams. Wenn nicht angegeben, wird während des Pipelineablaufs automatisch eine Metadatentabelle für Cloud Spanner-Änderungsstreams erstellt. Dieser Parameter muss beim Aktualisieren einer vorhandenen Pipeline angegeben werden und sollte nicht anderweitig angegeben werden.
rpcPriority (Optional) Die Anfragepriorität für Cloud Spanner-Aufrufe. Der Wert muss einer der folgenden sein:[HIGH,MEDIUM,LOW]. (Standardeinstellung: HIGH)
startTimestamp (Optional) Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
endTimestamp (Optional) Die End-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
bigQueryProjectId (Optional) Das BigQuery-Projekt. Der Standardwert ist das Projekt für den Dataflow-Job.
bigQueryChangelogTableNameTemplate (Optional) Die Vorlage für den Namen der BigQuery-Änderungslogtabellen. Die Standardeinstellung ist {_metadata_spanner_table_name}_changelog.
deadLetterQueueDirectory (Optional) Der Dateipfad zum Speichern nicht verarbeiteter Einträge mit einer Begründung für die fehlerhafte Verarbeitung. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist unter den meisten Bedingungen ausreichend.
dlqRetryMinutes (Optional) Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Der Standardwert ist 10.
ignoreFields (Optional) Durch Kommas getrennte Liste von Feldern (Groß- und Kleinschreibung wird ignoriert). Dies können Felder von überwachten Tabellen oder Metadatenfelder sein, die von der Pipeline hinzugefügt werden. Ignorierte Felder werden nicht in BigQuery eingefügt.

Vorlage „Cloud Spanner-Änderungsstreams für BigQuery“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • BIGQUERY_DATASET: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • BIGQUERY_DATASET: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.

Cloud Spanner-Änderungsstreams zu Pub/Sub

Die Cloud Spanner-Änderungsstreams zur Pub/Sub-Vorlage sind eine Streaming-Pipeline, die Cloud Spanner-Datenänderungsdatensätze streamt und sie mit Dataflow Runner V2 in Pub/Sub-Themen schreibt.

Um Ihre Daten in ein neues Pub/Sub-Thema auszugeben, müssen Sie zuerst das Thema erstellen. Nach der Erstellung generiert Pub/Sub automatisch ein Abo und hängt es an das neue Thema an. Wenn Sie versuchen, Daten an ein nicht vorhandenes Pub/Sub-Thema auszugeben, löst die Dataflow-Pipeline eine Ausnahme aus und die Pipeline bleibt hängen, da sie kontinuierlich versucht, eine Verbindung herzustellen.

Wenn das erforderliche Pub/Sub-Thema bereits vorhanden ist, können Sie Daten zu diesem Thema ausgeben.

Weitere Informationen finden Sie unter Informationen zu Änderungsstreams, Verbindungen von Änderungsstreams mit Dataflow erstellen und Best Practices für Änderungsstreams.

Voraussetzungen für diese Pipeline:

  • Die Cloud Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Die Cloud Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Der Cloud Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema muss vorhanden sein, bevor die Pipeline ausgeführt wird.

Vorlagenparameter

Parameter Beschreibung
spannerInstanceId Die Cloud Spanner-Instanz, aus der Änderungsstreams gelesen werden sollen.
spannerDatabase Die Cloud Spanner-Datenbank, aus der Änderungsstreams gelesen werden sollen.
spannerMetadataInstanceId Die Cloud Spanner-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerMetadataDatabase Die Cloud Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
spannerChangeStreamName Der Name des Cloud Spanner-Änderungsstreams, aus dem gelesen werden soll.
pubsubTopic Das Pub/Sub-Thema für die Ausgabe der Änderungsstreams.
spannerProjectId (Optional) Das Projekt, aus dem Änderungsstreams gelesen werden. Dies ist auch das Projekt, in dem die Metadatentabelle des Änderungsstream-Connectors erstellt wird. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
spannerMetadataTableName (Optional) Der Name der zu verwendenden Connector-Metadatentabelle für Cloud Spanner-Änderungsstreams. Wenn nicht angegeben, erstellt Cloud Spanner automatisch die Metadatentabelle des Stream-Connectors während der Pipeline-Ablaufänderung. Sie müssen diesen Parameter beim Aktualisieren einer vorhandenen Pipeline angeben. Verwenden Sie diesen Parameter nicht für andere Fälle.
rpcPriority (Optional) Die Anfragepriorität für Cloud Spanner-Aufrufe. Der Wert muss einer der folgenden sein: [HIGH,MEDIUM,LOW]. (Standardeinstellung: HIGH)
startTimestamp (Optional) Die Start-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Beispiel: ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
endTimestamp (Optional) Die End-DateTime (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Beispiel: ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
outputFileFormat (Optional) Das Format der Ausgabe. Die Ausgabe wird in viele PubsubMessages zusammengefasst und an ein Pub/Sub-Thema gesendet. Zulässige Formate sind JSON und AVRO. Der Standardwert ist JSON.
pubsubAPI (Optional) Pub/Sub API zur Implementierung der Pipeline. Zulässige APIs sind pubsubio und native_client. Bei einer geringen Anzahl von Abfragen pro Sekunde hat native_client eine geringere Latenz. Bei einer großen Anzahl von Abfragen pro Sekunde bietet pubsubio eine bessere und stabilere Leistung. Der Standardwert ist pubsubio.

Cloud Spanner-Änderungsstreams zur Pub/Sub-Vorlage ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to Pub/Sub template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • PUBSUB_TOPIC: Pub/Sub-Thema für die Ausgabe der Änderungsstreams

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • SPANNER_INSTANCE_ID: ID der Cloud Spanner-Instanz
  • SPANNER_DATABASE: Cloud Spanner-Datenbank
  • SPANNER_METADATA_INSTANCE_ID: ID der Cloud Spanner-Metadateninstanz
  • SPANNER_METADATA_DATABASE: Cloud Spanner-Metadatendatenbank
  • SPANNER_CHANGE_STREAM: Cloud Spanner-Änderungsstream
  • PUBSUB_TOPIC: Pub/Sub-Thema für die Ausgabe der Änderungsstreams

MongoDB für BigQuery (CDC)

Die Vorlage "MongoDB für BigQuery CDC (Change Data Capture)" ist eine Streamingpipeline, die mit MongoDB-Änderungsstreams zusammenarbeitet. Die Pipeline liest die per Push über einen MongoDB-Änderungsstream an Pub/Sub übertragenen JSON-Einträge und schreibt sie wie im Parameter userOption angegeben in BigQuery.

Voraussetzungen für diese Pipeline

  • Das BigQuery-Ziel-Dataset muss vorhanden sein.
  • Die MongoDB-Quellinstanz muss über die Dataflow-Worker-Maschinen zugänglich sein.
  • Der Änderungsstream, der Änderungen von MongoDB an Pub/Sub überträgt, sollte ausgeführt werden.

Vorlagenparameter

Parameter Beschreibung
mongoDbUri MongoDB-Verbindungs-URI im Format mongodb+srv://:@.
database Datenbank in MongoDB, aus der die Sammlung gelesen werden soll. Beispiel: my-db.
collection Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
outputTableSpec BigQuery-Tabelle, in die Daten geschrieben werden sollen. Beispiel: bigquery-project:dataset.output_table.
userOption FLATTEN oder NONE. FLATTEN vereinfacht die Dokumente auf die erste Ebene. NONE speichert das gesamte Dokument als JSON-String.
inputTopic Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<project>/topics/<topic>.

Vorlage "MongoDB für BigQuery (CDC)" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the MongoDB to BigQuery (CDC) template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
  • MONGO_DB_URI: Ihr MongoDB-URI.
  • DATABASE: Ihre MongoDB-Datenbank.
  • COLLECTION: Ihre MongoDB-Sammlung.
  • USER_OPTION: FLATTEN oder NONE.
  • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • OUTPUT_TABLE_SPEC: Der Name Ihrer BigQuery-Zieltabelle.
  • MONGO_DB_URI: Ihr MongoDB-URI.
  • DATABASE: Ihre MongoDB-Datenbank.
  • COLLECTION: Ihre MongoDB-Sammlung.
  • USER_OPTION: FLATTEN oder NONE.
  • INPUT_TOPIC: Ihr Pub/Sub-Eingabethema.