Von Google bereitgestellte Dataflow-Streamingvorlagen

Mit Sammlungen den Überblick behalten Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.

Google bietet eine Reihe von Open-Source-Vorlagen für Dataflow.

Mit diesen Dataflow-Vorlagen können Sie umfangreiche Datenaufgaben wie Datenimport, Datenexport, Datensicherung, Datenwiederherstellung und Bulk-API-Vorgänge lösen, ohne eine dedizierte Entwicklungsumgebung zu verwenden. Die Vorlagen basieren auf Apache Beam und verwenden Dataflow, um die Daten zu transformieren.

Allgemeine Informationen zu Vorlagen finden Sie unter Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie unter Erste Schritte mit von Google bereitgestellten Vorlagen.

In dieser Anleitung werden Streamingvorlagen beschrieben.

Pub/Sub-Abo für BigQuery

Die Vorlage "Pub/Sub-Abo für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise können Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen. Das Tabellenschema muss mit den JSON-Eingabeobjekten übereinstimmen.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle im Format <my-project>:<my-dataset>.<my-table> für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen OUTPUT_TABLE_SPEC_error_records verwendet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Pub/Sub-Abo für BigQuery-Vorlage ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Subscription to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

Pub/Sub-Thema für BigQuery

Die Vorlage "Pub/Sub-Thema für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Das data-Feld mit Pub/Sub-Nachrichten muss das JSON-Format verwenden, das in diesem JSON-Leitfaden beschrieben wird. Beispielsweise können Nachrichten mit Werten im data-Feld, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit einem Stringdatentyp eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen. Das Tabellenschema muss mit den JSON-Eingabeobjekten übereinstimmen.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<project>/topics/<topic>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Sie sollte das Format <my-project>:<my-dataset>.<my-table> haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

Vorlage "Pub/Sub-Thema für BigQuery" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Topic to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname

Pub/Sub Avro für BigQuery

Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Voraussetzungen für diese Pipeline

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
schemaPath Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel: gs://path/to/my/schema.avsc
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. Beispiel: <my-project>:<my-dataset>.<my-table> Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden.
writeDisposition (Optional) Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standardeinstellung: WRITE_APPEND
createDisposition (Optional) Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standardeinstellung: CREATE_IF_NEEDED

Vorlage „Pub/Sub Avro für BigQuery“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Avro to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

Pub/Sub Proto für BigQuery

Die Vorlage „Pub/Sub Proto für BigQuery“ ist eine Streamingpipeline, die Proto-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Sie können eine benutzerdefinierte JavaScript-Funktion (UDF) zum Transformieren von Daten bereitstellen. Fehler während der Ausführung der UDF können entweder an ein separates Pub/Sub-Thema oder an dasselbe nicht verarbeitete Thema wie die BigQuery-Fehler gesendet werden.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Proto-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Ausgabethema muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
  • Wenn die BigQuery-Tabelle vorhanden ist, muss sie ein Schema haben, das mit den Proto-Daten unabhängig vom createDisposition-Wert übereinstimmt.

Vorlagenparameter

Parameter Beschreibung
protoSchemaPath Der Cloud Storage-Speicherort der eigenständigen Proto-Schemadatei. z. B. gs://path/to/my/file.pb. Diese Datei kann mit dem Flag --descriptor_set_out des Befehls protoc generiert werden. Das Flag --include_imports garantiert, dass die Datei unabhängig ist.
fullMessageName Der vollständige Proto-Nachrichtenname. Beispiel: package.name.MessageName, wobei package.name der Wert für die Anweisung package und nicht für die Anweisung java_package ist.
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. z. B. my-project:my_dataset.my_table. Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit der Eingabeschemadatei erstellt werden.
preserveProtoFieldNames (Optional) true, um den ursprünglichen Proto-Feldnamen in JSON beizubehalten. false, um weitere JSON-Standardnamen zu verwenden. Zum Beispiel würde false field_name in fieldName ändern. (Standard: false)
bigQueryTableSchemaPath (Optional) Cloud Storage-Pfad zum BigQuery-Schemapfad. z. B. gs://path/to/my/schema.json. Falls nicht angegeben ist, wird das Schema aus dem Proto-Schema abgeleitet.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
udfOutputTopic (Optional) Das Pub/Sub-Thema, in dem die UDF-Fehler gespeichert werden. z. B. projects/<project-id>/topics/<topic-name> Wenn nicht angegeben, werden UDF-Fehler an dasselbe Thema wie outputTopic gesendet.
writeDisposition (Optional) Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standard: WRITE_APPEND.
createDisposition (Optional) Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standard: CREATE_IF_NEEDED.

Vorlage Pub/Sub Proto für BigQuery ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Proto to BigQuery template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.pb)
  • PROTO_MESSAGE_NAME: der Proto-Nachrichtenname (z. B. package.name.MessageName)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • UNPROCESSED_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

Pub/Sub zu Pub/Sub

Die Vorlage "Pub/Sub für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und in ein anderes Pub/Sub-Thema schreibt. Die Pipeline akzeptiert auch einen optionalen Nachrichtenattributschlüssel und einen Wert, die zum Filtern der Nachrichten verwendet werden können, die in das Pub/Sub-Thema geschrieben werden sollen. Sie können diese Vorlage verwenden, um Nachrichten mit einem optionalen Nachrichtenfilter von einem Pub/Sub-Abo in ein anderes Pub/Sub-Thema zu kopieren.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vor der Ausführung vorhanden sein.
  • Das Quell-Pub/Sub-Abo muss die Push-Zustellung verwenden.
  • Das Pub/Sub-Thema, in das geschrieben werden soll, muss vor der Ausführung vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Pub/Sub-Abo, aus dem die Eingabe gelesen wird. z. B. projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub-Thema, in das die Ausgabe geschrieben wird. z. B. projects/<project-id>/topics/<topic-name>.
filterKey (Optional) Filterereignisse nach Attributschlüssel. Wenn filterKey nicht festgelegt ist, werden keine Filter angewendet.
filterValue (Optional) Filterattributwert für den Fall, dass "filterKey" bereitgestellt wird. Standardmäßig ist für filterValue null festgelegt.

Vorlage "Pub/Sub zu Pub/Sub" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Pub/Sub template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.

Pub/Sub zu Splunk

Die Vorlage "Pub/Sub für Splunk" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und die Nutzlast der Nachricht über den HTTP Event Collector (HEC) von Splunk in Splunk schreibt. Der häufigste Anwendungsfall dieser Vorlage ist das Exportieren von Logs nach Splunk. Ein Beispiel für den zugrunde liegenden Workflow finden Sie unter Produktionsfähige Logexporte in Dataflow für Splunk bereitstellen.

Vor dem Schreiben in Splunk können Sie auch eine benutzerdefinierte JavaScript-Funktion auf die Nachrichtennutzlast anwenden. Alle Nachrichten, bei denen Verarbeitungsfehler auftreten, werden zur weiteren Fehlerbehebung und erneuten Verarbeitung an ein unverarbeitetes Thema in Pub/Sub weitergeleitet.

Als zusätzlichen Schutz für Ihr HEC-Token können Sie auch einen Cloud KMS-Schlüssel zusammen mit dem base64-codierten HEC-Tokenparameter übergeben, der mit dem Cloud KMS-Schlüssel verschlüsselt ist. Weitere Informationen zum Verschlüsseln des HEC-Tokenparameters finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Auf den Splunk-HEC-Endpunkt muss über das Dataflow-Worker-Netzwerk zugegriffen werden können.
  • Das Splunk-HEC-Token muss generiert und verfügbar sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Abo, aus dem die Eingabe gelesen wird. z. B. projects/<project-id>/subscriptions/<subscription-name>.
token (Optional) Das Splunk-HEC-Authentifizierungstoken. Muss angegeben werden, wenn tokenSource auf PLAINTEXT oder KMS festgelegt ist.
url Die Splunk-HEC-URL. Diese muss von der VPC, in der die Pipeline ausgeführt wird, weitergeleitet werden können. Beispiel: https://splunk-hec-host:8088.
outputDeadletterTopic Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. z. B. projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
batchCount (Optional) Die Batchgröße zum Senden mehrerer Ereignisse an Splunk. Standardeinstellung: 1 (keine Batchverarbeitung).
parallelism (Optional) Die maximale Anzahl an parallelen Anfragen. Standardeinstellung: 1 (keine Parallelität).
disableCertificateValidation (Optional) SSL-Zertifikatsvalidierung deaktivieren. Standardeinstellung: "false" (Validierung aktiviert). Bei "wahr" werden die Zertifikate nicht validiert (alle Zertifikate sind vertrauenswürdig) und der Parameter "rootCaCertificatePath" wird ignoriert.
includePubsubMessage (Optional) Schließen Sie die vollständige Pub/Sub-Nachricht in die Nutzlast ein. Standardeinstellung: "false" (nur das Datenelement ist in der Nutzlast enthalten).
tokenSource Quelle des Tokens. Entweder PLAINTEXT, KMS oder SECRET_MANAGER. Dieser Parameter muss angegeben werden, wenn Secret Manager verwendet wird. Wenn tokenSource auf KMS festgelegt ist, muss tokenKMSEncryptionKey und verschlüsselter token bereitgestellt werden. Wenn tokenSource auf SECRET_MANAGER festgelegt ist, muss tokenSecretId bereitgestellt werden. Wenn tokenSource auf PLAINTEXT festgelegt ist, muss token bereitgestellt werden.
tokenKMSEncryptionKey (Optional) Der Cloud KMS-Schlüssel zum Entschlüsseln des HEC-Tokenstrings. Dieser Parameter muss angegeben werden, wenn tokenSource auf KMS gesetzt ist. Wenn der Cloud KMS-Schlüssel bereitgestellt wird, muss der HEC-Tokenstring verschlüsselt übergeben werden.
tokenSecretId (Optional) Die Secret Manager-Secret-ID für das Token. Dieser Parameter muss angegeben werden, wenn tokenSource auf SECRET_MANAGER festgelegt ist. Er sollte folgendes Format haben: projects/<project-id>/secrets/<secret-name>/versions/<secret-version>.
rootCaCertificatePath (Optional) Die vollständige URL zum Stamm-CA-Zertifikat in Cloud Storage. z. B. gs://mybucket/mycerts/privateCA.crt. Das in Cloud Storage bereitgestellte Zertifikat muss DER-codiert sein und kann in binärer oder druckbarer Base64-Codierung bereitgestellt werden. Wenn das Zertifikat in Base64-Codierung bereitgestellt wird, muss es am Anfang durch -----BEGIN CERTIFICATE----- und am Ende durch -----END CERTIFICATE----- begrenzt werden. Wenn dieser Parameter angegeben wird, wird diese private CA-Zertifikatsdatei abgerufen und zum Vertrauensspeicher des Dataflow Workers hinzugefügt, um das SSL-Zertifikat des Splunk HEC-Endpunkts zu überprüfen. Wenn dieser Parameter nicht angegeben ist, wird der Standard-Vertrauensspeicher verwendet.
enableBatchLogs (Optional) Gibt an, ob Logs für Batches aktiviert werden sollen, die in Splunk geschrieben werden. Standardeinstellung: true.
enableGzipHttpCompression (Optional) Gibt an, ob HTTP-Anfragen, die an Splunk HEC gesendet werden, komprimiert werden sollen (gzip-Inhaltscodierung). Standardeinstellung: true.

Vorlage "Pub/Sub für Splunk" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Splunk template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
  • ROOT_CA_CERTIFICATE_PATH: Der Pfad zum Stamm-CA-Zertifikat in Cloud Storage (z. B. gs://your-bucket/privateCA.crt)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
  • ROOT_CA_CERTIFICATE_PATH: Der Pfad zum Stamm-CA-Zertifikat in Cloud Storage (z. B. gs://your-bucket/privateCA.crt)

Pub/Sub für Avro-Dateien in Cloud Storage

Die Vorlage „Pub/Sub für Avro-Dateien in Cloud Storage“ ist eine Streamingpipeline, die Daten aus einem Pub/Sub-Thema liest und Avro-Dateien in einen angegebenen Cloud Storage-Bucket schreibt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Eingabethema muss vor der Ausführung der Pipeline vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputTopic Cloud Pub/Sub-Thema, das zur Nachrichtenaufnahme abonniert werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Ausgabeverzeichnis, in dem die Avro-Dateien archiviert sind. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/
avroTempDirectory Verzeichnis für temporäre Avro-Dateien. Muss am Ende / enthalten. Beispiel: gs://example-bucket/example-directory/.
outputFilenamePrefix (Optional) Präfix für den Ausgabedateinamen der Avro-Dateien.
outputFilenameSuffix (Optional) Suffix für den Ausgabedateinamen der Avro-Dateien.
outputShardTemplate [Optional) Shard-Vorlage der Ausgabedatei. Sie wird als sich wiederholende Folge der Buchstaben S oder N angegeben. Beispiel: SSS-NNN. Diese werden entweder durch die Shard-Nummer oder durch die Gesamtzahl der Shards ersetzt. Wenn dieser Parameter nicht angegeben ist, ist das Standardvorlagenformat W-P-SS-of-NN.

Vorlage "Pub/Sub für Cloud Storage Avro" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Avro Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage

Pub/Sub-Thema für Textdateien in Cloud Storage

Die Vorlage "Pub/Sub-Thema für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
  • Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. z. B. output-.
outputFilenameSuffix Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzigen Datei pro Fenster ausgegeben werden. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.

Vorlage "Pub/Sub-Thema für Textdateien in Cloud Storage" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Text Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

Pub/Sub-Thema oder -Abo für Textdateien in Cloud Storage

Die Vorlage "Pub/Sub-Thema oder -Abo für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Thema oder -Abo muss vor der Ausführung vorhanden sein.
  • Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
  • Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben. Wenn dieser Parameter angegeben wird, sollte inputSubscription nicht angegeben werden.
inputSubscription Das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll. Der Aboname muss das Format projects/<project-id>/subscription/<subscription-name> haben. Wenn dieser Parameter angegeben wird, sollte inputTopic nicht angegeben werden.
outputDirectory Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. z. B. output-.
outputFilenameSuffix Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzigen Datei pro Fenster ausgegeben werden. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.
windowDuration (Optional) Die Fensterdauer ist das Intervall, in dem Daten in das Ausgabeverzeichnis geschrieben werden. Konfigurieren Sie die Dauer anhand des Durchsatzes der Pipeline. Beispielsweise kann ein höherer Durchsatz kleinere Fenstergrößen erfordern, damit die Daten in den Speicher passen. Die Standardeinstellung ist „5m”, mit mindestens 1 s. Zulässige Formate sind: [int]s (für Sekunden, Beispiel: 5s), [int]m (für Minuten, Beispiel: 12m), [int]h (für Stunden, Beispiel: 2h).

Vorlage "Pub/Sub-Thema oder -Abo für Textdateien in Cloud Storage" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

Pub/Sub für MongoDB

Die Vorlage „Pub/Sub für MongoDB“ ist eine Streamingpipeline, die JSON-codierte Nachrichten aus einem Pub/Sub-Abo liest und in MongoDB als Dokumente schreibt. Bei Bedarf unterstützt diese Pipeline zusätzliche Transformationen, die über eine benutzerdefinierte JavaScript-Funktion (UDF) eingebunden werden können. Alle Fehler sind aufgrund von nicht übereinstimmenden Schemata, nicht korrekt formatiertem JSON oder während der Ausführung von Transformationen in einer BigQuery-Tabelle für nicht verarbeitete Nachrichten zusammen mit der Eingabenachricht aufgetreten. Falls noch keine Tabelle für nicht verarbeitete Datensätze vorhanden ist, wird diese Tabelle von der Pipeline automatisch erstellt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Der MongoDB-Cluster muss vorhanden und über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Name des Pub/Sub-Abos. Beispiel: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Durch Kommas getrennte Liste von MongoDB-Servern. Beispiel: 192.285.234.12:27017,192.287.123.11:27017
database Datenbank in MongoDB zum Speichern der Sammlung. Beispiel: my-db.
collection Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
deadletterTable BigQuery-Tabelle, die aus Fehlern resultierende Nachrichten speichert (nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Dateien usw.). Beispiel: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
batchSize (Optional) Batchgröße für die Aufnahme von Dokumentenbatches in MongoDB. Standardeinstellung: 1000.
batchSizeBytes (Optional) Batchgröße in Byte. Standardeinstellung: 5242880.
maxConnectionIdleTime (Optional) Maximale zulässige Leerlaufzeit in Sekunden, bis eine Zeitüberschreitung der Verbindung auftritt. Standardeinstellung: 60000.
sslEnabled (Optional) Boolescher Wert, der angibt, ob für die Verbindung zu MongoDB SSL aktiviert ist. Standardeinstellung: true.
ignoreSSLCertificate (Optional) Boolescher Wert, der angibt, ob das SSL-Zertifikat ignoriert werden soll. Standardeinstellung: true.
withOrdered (Optional) Boolescher Wert, mit dem geordnete Bulk-Aufnahmen in MongoDB aktiviert werden. Standardeinstellung: true.
withSSLInvalidHostNameAllowed (Optional) Boolescher Wert, der angibt, ob ein ungültiger Hostname für die SSL-Verbindung zulässig ist. Standardeinstellung: true.

Vorlage „Pub/Sub für MongoDB“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to MongoDB template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)

Pub/Sub für Elasticsearch

Die Vorlage „Pub/Sub für Elasticsearch“ ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und sie als Dokumente in Elasticsearch schreibt. Die Dataflow-Vorlage verwendet die Datenstreams-Funktion von Elasticsearch, um Zeitachsendaten über mehrere Indexe zu speichern, wobei Sie eine einzige benannte Ressource für Anfragen erhalten. Datenstreams eignen sich gut für Logs, Messwerte, Traces und andere kontinuierlich generierte Daten, die in Pub/Sub gespeichert sind.

Voraussetzungen für diese Pipeline

  • Das Quell-Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Ein öffentlich erreichbarer Elasticsearch-Host auf einer GCP-Instanz oder in Elastic Cloud mit Elasticsearch Version 7.0 oder höher. Weitere Informationen finden Sie unter Google Cloud-Integration für Elastic.
  • Ein Pub/Sub-Thema für die Fehlerausgabe

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Cloud Pub/Sub-Abo, das verwendet werden soll. Der Name muss das Format projects/<project-id>/subscriptions/<subscription-name> haben.
connectionUrl Elasticsearch-URL im Format https://hostname:[port] oder geben Sie die CloudID an, wenn Elastic Cloud verwendet wird.
apiKey Base64-codierter API-Schlüssel für die Authentifizierung.
errorOutputTopic Pub/Sub-Ausgabe-Thema für die Veröffentlichung fehlgeschlagener Datensätze im Format projects/<project-id>/topics/<topic-name>
dataset (Optional) Der Typ von über Pub/Sub gesendete Logs, für die wir ein sofort einsatzfähiges Dashboard haben. Bekannte Werte für Logtypen sind "audit", "vpcflow" und "firewall". Standardeinstellung: pubsub.
namespace (Optional) Eine beliebige Gruppierung, z. B. eine Umgebung (dev, prod oder qa), ein Team oder eine strategische Geschäftseinheit. Standardeinstellung: default.
batchSize (Optional) Batchgröße in der Anzahl an Dokumenten. Standardeinstellung: 1000.
batchSizeBytes (Optional) Batchgröße in der Anzahl an Byte. Standardeinstellung: 5242880 (5 MB).
maxRetryAttempts (Optional) Maximale Wiederholungsversuche, muss > 0 sein. Standardeinstellung: no retries.
maxRetryDuration (Optional) Maximale Wiederholungsdauer in Millisekunden, muss > 0 sein. Standardeinstellung: no retries.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
propertyAsIndex (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_index-UDF). Standardwert: none.
propertyAsId (Optional) Eine Eigenschaft im indexierten Dokument, deren Wert angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen (hat Vorrang vor einer_id-UDF). Standardwert: none.
javaScriptIndexFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIndexFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _index-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIdFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _id-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptTypeFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die angibt, dass _type-Metadaten im Dokument in der Bulk-Anfrage enthalten sein sollen. Standardwert: none.
javaScriptIsDeleteFnGcsPath (Optional) Der Cloud Storage-Pfad zur JavaScript-UDF-Quelle für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert "true" oder "false" zurückgeben. Standardwert: none.
javaScriptIsDeleteFnName (Optional) UDF-JavaScript-Funktionsname für eine Funktion, die bestimmt, ob das Dokument gelöscht statt eingefügt oder aktualisiert werden soll. Die Funktion sollte den Stringwert "true" oder "false" zurückgeben. Standardwert: none.
usePartialUpdate (Optional) Gibt an, ob Teilaktualisierungen (Aktualisieren statt Erstellen oder Indexieren, Teildokumente sind zulässig) in Elasticsearch-Anfragen verwendet werden sollen. Standardeinstellung: false.
bulkInsertMethod (Optional) Gibt an, ob INDEX (Indexieren, Upserts sind zulässig) oder CREATE (Erstellen, Fehler bei doppelter _id) in Bulk-Anfragen von Elasticsearch verwendet werden soll. Standardeinstellung: CREATE.

Vorlage „Pub/Sub für Elasticsearch“ ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Elasticsearch template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • ERROR_OUTPUT_TOPIC: das Pub/Sub-Thema für die Fehlerausgabe
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • CONNECTION_URL: die Elasticsearch-URL
  • DATASET: Ihr Logtyp
  • NAMESPACE: Ihr Namespace für das Dataset
  • APIKEY: der base64-codierte API-Schlüssel für die Authentifizierung

Datastream zu Cloud Spanner

Die Vorlage "Datastream zu Cloud Spanner" ist eine Streamingpipeline, die Datastream-Ereignisse aus einem Cloud Storage-Bucket liest und in eine Cloud Spanner-Datenbank schreibt. Sie ist für die Datenmigration von Datastream-Quellen zu Cloud Spanner vorgesehen.

Alle für die Migration erforderlichen Tabellen müssen vor der Ausführung der Vorlage in der Cloud Spanner-Zieldatenbank vorhanden sein. Daher muss die Schemamigration von einer Quelldatenbank zum Cloud Spanner-Ziel abgeschlossen sein, bevor Sie Daten migrieren können. Daten können in den Tabellen vor der Migration vorhanden sein. Diese Vorlage leitet keine Änderungen des Datastream-Schemas an die Cloud Spanner-Datenbank weiter.

Die Datenkonsistenz wird erst am Ende der Migration garantiert, wenn alle Daten in Cloud Spanner geschrieben wurden. Zum Speichern von Reihenfolgeinformationen für jeden in Cloud Spanner geschriebenen Datensatz erstellt diese Vorlage eine zusätzliche Tabelle (sogenannte Schattentabelle) für jede Tabelle in der Cloud Spanner-Datenbank. Dadurch wird die Konsistenz am Ende der Migration sichergestellt. Die Schattentabellen werden nach der Migration nicht gelöscht und können am Ende der Migration zur Validierung verwendet werden.

Alle Fehler, die während des Vorgangs auftreten, z. B. nicht übereinstimmende Schemas, fehlerhafte JSON-Dateien oder Fehler, die sich aus der Ausführung von Transformationen ergeben, werden in einer Fehlerwarteschlange aufgezeichnet. Die Fehlerwarteschlange ist ein Cloud Storage-Ordner, in dem alle Datastream-Ereignisse gespeichert werden, bei denen Fehler aufgetreten sind, zusammen mit der Fehlerursache im Textformat. Die Fehler können vorübergehend oder dauerhaft sein und in den entsprechenden Cloud Storage-Ordnern in der Fehlerwarteschlange gespeichert werden. Bei diesen vorübergehenden Fehler erfolgt automatisch eine Wiederholung, bei dauerhaften Fehlern dagegen nicht. Bei dauerhaften Fehlern haben Sie die Möglichkeit, Korrekturen an den Änderungsereignissen vorzunehmen und diese in den Bucket für Wiederholungen zu verschieben, während die Vorlage ausgeführt wird.

Voraussetzungen für diese Pipeline:

  • Ein Datastream-Stream mit dem Status Wird ausgeführt oder Nicht gestartet.
  • Ein Cloud Storage-Bucket, in dem Datastream-Ereignisse repliziert werden.
  • Eine Cloud Spanner-Datenbank mit vorhandenen Tabellen. Diese Tabellen können leer sein oder Daten enthalten.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Der Speicherort für Datastream-Dateien in Cloud Storage, die repliziert werden sollen. In der Regel ist dies der Stammpfad für einen Stream.
streamName Der Name oder die Vorlage für den Stream, der für Schemainformationen und den Quelltyp abgefragt werden soll.
instanceId Die Cloud Spanner-Instanz, in der die Änderungen repliziert werden.
databaseId Die Cloud Spanner-Datenbank, in der die Änderungen repliziert werden.
projectId Die Cloud Spanner-Projekt-ID.
deadLetterQueueDirectory (Optional) Dies ist der Dateipfad zum Speichern der Fehlerwarteschlangenausgabe. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs.
inputFileFormat Optional: Das Format der von Datastream generierten Ausgabedatei. Beispiel: avro,json Standardeinstellung: avro.
shadowTablePrefix (Optional) Das Präfix zum Benennen von Schattentabellen. Standardeinstellung: shadow_.

Vorlage "Datastream zu Cloud Spanner" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Datastream to Spanner template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE ist Ihre Cloud Spanner-Instanz.
  • CLOUDSPANNER_DATABASE ist Ihre Cloud Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GCS_FILE_PATH ist der Cloud Storage-Pfad, der zum Speichern von Datastream-Ereignissen verwendet wird. Beispiel: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE ist Ihre Cloud Spanner-Instanz.
  • CLOUDSPANNER_DATABASE ist Ihre Cloud Spanner-Datenbank
  • DLQ ist der Cloud Storage-Pfad für das Fehlerwarteschlangenverzeichnis.

Textdateien in Cloud Storage für BigQuery (Stream)

Die Pipeline „Textdateien in Cloud Storage für BigQuery“ ist eine Streamingpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien streamen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery anhängen können.

Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss manuell über eine Cancel-Anweisung und kein Drain beendet werden, aufgrund ihrer Verwendung der Watch Transformation, die eine splittable DoFn ist, die den Draining nicht unterstützt.

Voraussetzungen für diese Pipeline:

  • Erstellen Sie eine JSON-Datei, die das Schema Ihrer Ausgabetabelle in BigQuery beschreibt.

    Stellen Sie ein JSON-Array der obersten Ebene mit dem Namen fields bereit, dessen Inhalt dem Muster {"name": "COLUMN_NAME", "type": "DATA_TYPE"} folgt. Beispiel:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • Erstellen Sie eine JavaScript-Datei (.