Von Google bereitgestellte Streamingvorlagen

Google bietet eine Reihe von Open-Source-Vorlagen für Dataflow. Allgemeine Informationen zu Vorlagen finden Sie auf der Seite Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie auf der Seite Erste Schritte mit von Google bereitgestellten Vorlagen.

Auf dieser Seite werden Streamingvorlagen beschrieben:

Pub/Sub-Abo für BigQuery

Die Vorlage "Pub/Sub-Abo für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Die Pub/Sub-Nachrichten müssen im JSON-Format vorliegen, wie hier beschrieben. Beispielsweise können Nachrichten, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit dem Datentyp "String" eingefügt werden.
  • Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle im Format <my-project>:<my-dataset>.<my-table> für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Pub/Sub-Abo für BigQuery-Vorlage ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Subscription to BigQuery template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME: ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME: ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name Ihres Pub/Sub-Abos
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub-Thema für BigQuery

Die Vorlage "Pub/Sub-Thema für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.

Voraussetzungen für diese Pipeline:

  • Die Pub/Sub-Nachrichten müssen im JSON-Format vorliegen, wie hier beschrieben. Beispielsweise können Nachrichten, die als {"k1":"v1", "k2":"v2"} formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namens k1 und k2 mit dem Datentyp "String" eingefügt werden.
  • Die Ausgabetabelle muss vor der Pipelineausführung vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<project>/topics/<topic>.
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table>.
outputDeadletterTable Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Sie sollte das Format <my-project>:<my-dataset>.<my-table> haben. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Vorlage "Pub/Sub-Thema für BigQuery" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Topic to BigQuery template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • DATASET: Ihr BigQuery-Dataset
  • TABLE_NAME: Ihr BigQuery-Tabellenname
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro für BigQuery

Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Voraussetzungen für diese Pipeline

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
schemaPath Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel: gs://path/to/my/schema.avsc
inputSubscription Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription>.
outputTopic Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name>.
outputTableSpec Ort der BigQuery-Ausgabetabelle. Beispiel: <my-project>:<my-dataset>.<my-table> Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden.
writeDisposition (Optional) Die BigQuery-WriteDisposition. Beispiel: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Standardeinstellung: WRITE_APPEND
createDisposition (Optional) Die BigQuery-CreateDisposition. Beispiele: CREATE_IF_NEEDED, CREATE_NEVER Standardeinstellung: CREATE_IF_NEEDED

Vorlage „Pub/Sub Avro für BigQuery“ ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Avro to BigQuery template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Dabei gilt:

  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION_NAME: der Name der Dataflow-Region (z. B. us-central1)
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • JOB_NAME ist ein Jobname Ihrer Wahl
  • LOCATION: der Name der Dataflow-Region (z. B. us-central1)
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub zu Pub/Sub

Die Vorlage "Pub/Sub für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und in ein anderes Pub/Sub-Thema schreibt. Die Pipeline akzeptiert auch einen optionalen Nachrichtenattributschlüssel und einen Wert, die zum Filtern der Nachrichten verwendet werden können, die in das Pub/Sub-Thema geschrieben werden sollen. Sie können diese Vorlage verwenden, um Nachrichten mit einem optionalen Nachrichtenfilter von einem Pub/Sub-Abo in ein anderes Pub/Sub-Thema zu kopieren.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vor der Ausführung vorhanden sein.
  • Das Pub/Sub-Thema, in das geschrieben werden soll, muss vor der Ausführung vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Pub/Sub-Abo, aus dem die Eingabe gelesen wird. Beispiel: projects/<project-id>/subscriptions/<subscription-name>.
outputTopic Cloud Pub/Sub-Thema, in das die Ausgabe geschrieben wird. Beispiel: projects/<project-id>/topics/<topic-name>.
filterKey [Optional] Filterereignisse nach Attributschlüssel. Wenn filterKey nicht festgelegt ist, werden keine Filter angewendet.
filterValue [Optional] Filterattributwert für den Fall, dass "filterKey" bereitgestellt wird. Standardmäßig ist für filterValue null festgelegt.

Vorlage "Pub/Sub für Pub/Sub" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Pub/Sub template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • FILTER_KEY: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.
  • FILTER_VALUE: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub zu Splunk

Die Vorlage "Pub/Sub für Splunk" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und die Nutzlast der Nachricht über den HTTP Event Collector (HEC) von Splunk in Splunk schreibt. Vor dem Schreiben in Splunk können Sie auch eine benutzerdefinierte JavaScript-Funktion auf die Nachrichtennutzlast anwenden. Alle Nachrichten, bei denen Verarbeitungsfehler auftreten, werden zur weiteren Fehlerbehebung und erneuten Verarbeitung an ein Pub/Sub-Thema für nicht verarbeitete Datensätze weitergeleitet.

Als zusätzlichen Schutz für Ihr HEC-Token können Sie auch einen Cloud KMS-Schlüssel zusammen mit dem base64-codierten HEC-Tokenparameter übergeben, der mit dem Cloud KMS-Schlüssel verschlüsselt ist. Weitere Informationen zum Verschlüsseln des HEC-Tokenparameters finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.

Voraussetzungen für diese Pipeline:

  • Das als Quelle dienende Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Auf den Splunk-HEC-Endpunkt muss über das Dataflow-Worker-Netzwerk zugegriffen werden können.
  • Das Splunk-HEC-Token muss generiert und verfügbar sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Das Pub/Sub-Abo, aus dem die Eingabe gelesen wird. Beispiel: projects/<project-id>/subscriptions/<subscription-name>.
token Das Splunk-HEC-Authentifizierungstoken. Dieser base64-codierte String kann zur zusätzlichen Sicherheit mit einem Cloud KMS-Schlüssel verschlüsselt werden.
url Die Splunk-HEC-URL. Diese muss von der VPC, in der die Pipeline ausgeführt wird, weitergeleitet werden können. Beispiel: https://splunk-hec-host:8088.
outputDeadletterTopic Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. Beispiel: projects/<project-id>/topics/<topic-name>.
javascriptTextTransformGcsPath [Optional] Der Cloud Storage-Pfad, der Ihren gesamten JavaScript-Code enthält. Beispiel: gs://mybucket/mytransforms/*.js.
javascriptTextTransformFunctionName [Optional] Der Name der aufzurufenden JavaScript-Funktion. Wenn Ihre JavaScript-Funktion beispielsweise function myTransform(inJson) { ...dostuff...} ist, lautet der Funktionsname myTransform.
batchCount [Optional] Die Batchgröße zum Senden mehrerer Ereignisse an Splunk. Standardeinstellung: 1 (keine Batchverarbeitung).
parallelism [Optional] Die maximale Anzahl an parallelen Anfragen. Standardeinstellung: 1 (keine Parallelität).
disableCertificateValidation [Optional] SSL-Zertifikatsvalidierung deaktivieren. Standardeinstellung: "false" (Validierung aktiviert).
includePubsubMessage [Optional] Schließen Sie die vollständige Pub/Sub-Nachricht in die Nutzlast ein. Standardeinstellung: "false" (nur das Datenelement ist in der Nutzlast enthalten).
tokenKMSEncryptionKey [Optional] Der Cloud KMS-Schlüssel zum Entschlüsseln des HEC-Tokenstrings. Wenn der Cloud KMS-Schlüssel bereitgestellt wird, muss der HEC-Tokenstring verschlüsselt übergeben werden.

Vorlage "Pub/Sub für Splunk" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Splunk template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: der Name Ihrer JavaScript-Funktion
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält (z. B. gs://your-bucket/your-function.js)
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • TOKEN: das HTTP Event Collector-Token von Splunk
  • URL: der URL-Pfad für den HTTP Event Collector von Splunk (z. B. https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: der Name Ihrer JavaScript-Funktion
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält (z. B. gs://your-bucket/your-function.js)
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Splunk
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
  • DISABLE_VALIDATION: true, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub für Avro-Dateien in Cloud Storage

Die Vorlage „Pub/Sub zu Avro-Dateien in Cloud Storage“ ist eine Streamingpipeline, die Daten aus einem Pub/Sub-Thema liest und Avro-Dateien in einen angegebenen Cloud Storage-Bucket schreibt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Eingabethema muss vor der Ausführung der Pipeline vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputTopic Cloud Pub/Sub-Thema, das zur Nachrichtenaufnahme abonniert werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Ausgabeverzeichnis, in dem die Avro-Dateien archiviert werden. Fügen Sie am Ende einen Schrägstrich (/) hinzu. Beispiel: gs://example-bucket/example-directory/.
avroTempDirectory Verzeichnis für temporäre Avro-Dateien. Am Ende muss ein Schrägstrich (/) hinzugefügt werden. Beispiel: gs://example-bucket/example-directory/.
outputFilenamePrefix [Optional] Präfix für den Ausgabedateinamen der Avro-Dateien.
outputFilenameSuffix [Optional] Ausgabedateiname-Suffix für die Avro-Dateien.
outputShardTemplate [Optional] Shard-Vorlage der Ausgabedatei. Wird als sich wiederholende Folge der Buchstaben "S" und "N" angegeben (Beispiel: SSS-NNN). Diese werden durch die Shard-Nummer bzw. die Anzahl der Shards ersetzt. Das Standardvorlagenformat ist 'WP-SS-of-NN', wenn dieser Parameter nicht angegeben ist.
numShards [Optional] Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Die maximale Standardanzahl der Shards beträgt 1.

Vorlage "Pub/Sub für Cloud Storage Avro" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Cloud Storage Avro template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME: Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage
  • NUM_SHARDS: die Anzahl der Ausgabe-Shards
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME: Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name des Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILENAME_PREFIX: das gewünschte Präfix des Ausgabedateinamens
  • FILENAME_SUFFIX: das gewünschte Suffix des Ausgabedateinamens
  • SHARD_TEMPLATE: die gewünschte Shard-Ausgabevorlage
  • NUM_SHARDS: die Anzahl der Ausgabe-Shards
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub für Textdateien in Cloud Storage

Die Vorlage "Pub/Sub für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
  • Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.

Vorlagenparameter

Parameter Beschreibung
inputTopic Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben.
outputDirectory Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/. Dieser Wert muss mit einem Schrägstrich enden.
outputFilenamePrefix Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. Beispiel: output-
outputFilenameSuffix Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv.
outputShardTemplate Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzelnen Datei pro Fenster enthalten sind. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01.

Vorlage „Pub/Sub für Textdateien in Cloud Storage“ ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Text Files on Cloud Storage template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub für MongoDB

Die Vorlage „Pub/Sub für MongoDB“ ist eine Streamingpipeline, die JSON-codierte Nachrichten aus einem Pub/Sub-Abo liest und in MongoDB als Dokumente schreibt. Bei Bedarf unterstützt diese Pipeline zusätzliche Transformationen, die über eine benutzerdefinierte JavaScript-Funktion (UDF) eingebunden werden können. Alle Fehler sind aufgrund von nicht übereinstimmenden Schemata, nicht korrekt formatiertem JSON oder während der Ausführung von Transformationen in einer BigQuery-Tabelle für nicht verarbeitete Nachrichten zusammen mit der Eingabenachricht aufgetreten. Falls noch keine Tabelle für nicht verarbeitete Datensätze vorhanden ist, wird diese Tabelle von der Pipeline automatisch erstellt.

Voraussetzungen für diese Pipeline:

  • Das Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
  • Der MongoDB-Cluster muss vorhanden und über die Dataflow-Worker-Maschinen zugänglich sein.

Vorlagenparameter

Parameter Beschreibung
inputSubscription Name des Pub/Sub-Abos. Beispiel: projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri Durch Kommas getrennte Liste von MongoDB-Servern. Beispiel: 192.285.234.12:27017,192.287.123.11:27017
database Datenbank in MongoDB zum Speichern der Sammlung. Beispiel: my-db.
collection Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection.
deadletterTable BigQuery-Tabelle, die aus Fehlern resultierende Nachrichten speichert (nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Dateien usw.). Beispiel: project-id:dataset-name.table-name.
javascriptTextTransformGcsPath [Optional] Cloud Storage-Speicherort der JavaScript-Datei mit der UDF-Transformation. Beispiel: gs://mybucket/filename.json.
javascriptTextTransformFunctionName [Optional] Name der JavaScript-UDF. Beispiel: transform
batchSize [Optional] Batchgröße für die Aufnahme von Dokumentenbatches in MongoDB. Standardeinstellung: 1000.
batchSizeBytes [Optional] Batchgröße in Byte. Standardeinstellung: 5242880.
maxConnectionIdleTime [Optional] Maximale zulässige Leerlaufzeit in Sekunden, bis eine Zeitüberschreitung der Verbindung auftritt. Standardeinstellung: 60000.
sslEnabled [Optional] Boolescher Wert, der angibt, ob für die Verbindung zu MongoDB SSL aktiviert ist. Standardeinstellung: true.
ignoreSSLCertificate [Optional] Boolescher Wert, der angibt, ob das SSL-Zertifikat ignoriert werden soll. Standardeinstellung: true.
withOrdered [Optional] Boolescher Wert, mit dem geordnete Bulk-Aufnahmen in MongoDB aktiviert werden. Standardeinstellung: true.
withSSLInvalidHostNameAllowed [Optional] Boolescher Wert, der angibt, ob ein ungültiger Hostname für die SSL-Verbindung zulässig ist. Standardeinstellung: true.

Vorlage „Pub/Sub für MongoDB“ ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage Pub/Sub to MongoDB template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • REGION_NAME: Name der Dataflow-Region, (z. B. us-central1)
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • LOCATION: Name der Dataflow-Region, (z. B. us-central1)
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • INPUT_SUBSCRIPTION: das Pub/Sub-Abo (z. B. projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: die MongoDB-Serveradressen (z. B. 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: der Name der MongoDB-Datenbank (z. B. users)
  • COLLECTION: der Name der MongoDB-Sammlung (z. B. profiles)
  • UNPROCESSED_TABLE: der Name der BigQuery-Tabelle (z. B. your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Textdateien in Cloud Storage für BigQuery (Stream)

Die Pipeline „Textdateien in Cloud Storage für BigQuery“ ist eine Streamingpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien streamen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery ausgeben können.

Voraussetzungen für diese Pipeline:

  • Erstellen Sie eine JSON-formatierte BigQuery-Schemadatei, die Ihre Ausgabetabelle beschreibt.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • Erstellen Sie eine JavaScript-Datei (.js) mit Ihrer benutzerdefinierten Funktion, die die Logik für die Transformation der Textzeilen bereitstellt. Beachten Sie, dass Ihre Funktion einen JSON-String zurückgeben muss.

    Diese Funktion teilt beispielsweise jede Zeile einer CSV-Datei auf und gibt nach der Transformation der Werte einen JSON-String zurück.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

Vorlagenparameter

Parameter Beschreibung
javascriptTextTransformGcsPath Der Cloud Storage-Speicherort Ihrer benutzerdefinierten JavaScript-Funktion. Beispiel: gs://my_bucket/my_function.js.
JSONPath Der Cloud Storage-Speicherort Ihrer BigQuery-Schemadatei im JSON-Format. Beispiel: gs://path/to/my/schema.json
javascriptTextTransformFunctionName Der Name der JavaScript-Funktion, die Sie als benutzerdefinierte Funktion aufrufen möchten. Beispiel: transform
outputTable Die vollständig qualifizierte BigQuery-Tabelle. Beispiel: my-project:dataset.table.
inputFilePattern Der Cloud Storage-Speicherort des Textes, den Sie verarbeiten möchten. Beispiel: gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory Das temporäre Verzeichnis für den BigQuery-Ladevorgang. Beispiel: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Beispiel: my-project:dataset.my-unprocessed-table. Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Vorlage "Cloud Storage Text für BigQuery (Stream)" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Cloud Storage Text to BigQuery template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: der Name Ihrer UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: der Name Ihrer UDF
  • PATH_TO_BIGQUERY_SCHEMA_JSON: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthält
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält
  • PATH_TO_TEXT_DATA: der Cloud Storage-Pfad zu Ihrem Text-Dataset
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • BIGQUERY_UNPROCESSED_TABLE: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete Nachrichten
  • PATH_TO_TEMP_DIR_ON_GCS: der Cloud Storage-Pfad zum temporären Verzeichnis
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Textdateien in Cloud Storage für Pub/Sub (Stream)

Diese Vorlage erstellt eine Streamingpipeline, die kontinuierlich nach neuen Textdateien sucht, die in Cloud Storage hochgeladen wurden, jede Datei Zeile für Zeile liest und Strings in einem Pub/Sub-Thema veröffentlicht. Die Vorlage veröffentlicht Datensätze aus einer JSON-Datei mit Zeilenumbruch oder einer CSV-Datei zur Echtzeitverarbeitung in einem Pub/Sub-Thema. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.

Derzeit ist das Abfrageintervall fest auf zehn Sekunden festgelegt. Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest, sodass die Ereigniszeit der Veröffentlichungszeit während der Ausführung entspricht. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, sollten Sie diese Pipeline nicht verwenden.

Voraussetzungen für diese Pipeline:

  • Die Eingabedateien müssen im JSON-Format mit Zeilenumbruch oder im CSV-Format vorliegen. Datensätze, die sich in den Quelldateien über mehrere Zeilen erstrecken, können später Probleme verursachen, da jede Zeile in den Dateien als eigene Nachricht in Pub/Sub veröffentlicht wird.
  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die Pipeline wird unbefristet ausgeführt und muss manuell beendet werden.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel: gs://bucket-name/files/*.json oder gs://bucket-name/path/*.csv.
outputTopic Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format projects/<project-id>/topics/<topic-name> haben.

Vorlage „Textdateien in Cloud Storage für Pub/Sub (Stream)” ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Text Files on Cloud Storage to Pub/Sub (Stream) template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • PROJECT_ID: Ihre Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • TOPIC_NAME: der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)

Die Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ist eine Streamingpipeline, die CSV-Dateien aus einem Cloud Storage-Bucket liest und die Cloud Data Loss Prevention API (Cloud DLP) für die De- Identifikation aufruft und die de-identifizierten Daten in die angegebene BigQuery-Tabelle schreibt. Diese Vorlage unterstützt sowohl die Verwendung einer Inspektionsvorlage als auch einer De-Identifikationsvorlage für Cloud DLP. Auf diese Weise können Nutzer nach potenziell vertraulichen Informationen suchen und deren Identifizierung aufheben sowie die Identifizierung von strukturierten Daten aufheben, in denen Spalten für die De-Identifikation angegeben sind und kein Prüfung erforderlich ist.

Voraussetzungen für diese Pipeline:

  • Die Eingabedaten für die Tokenisierung müssen vorhanden sein.
  • Die Cloud DLP-Vorlagen müssen vorhanden sein, zum Beispiel "DeidentifyTemplate" und "InspectTemplate". Weitere Informationen finden Sie unter Cloud DLP-Vorlagen.
  • Das BigQuery-Dataset muss vorhanden sein.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Die CSV-Dateien, aus denen Eingabedatensätze gelesen werden sollen. Platzhalter werden ebenfalls akzeptiert. Beispiel: gs://mybucket/my_csv_filename.csv oder gs://mybucket/file-*.csv.
dlpProjectId ID des Cloud DLP-Projekts, das Inhaber der Cloud DLP API-Ressource ist. Dieses Cloud DLP-Projekt kann dasselbe Projekt sein, das auch Inhaber der Cloud DLP-Vorlagen ist, oder ein separates Projekt. Beispiel: my_dlp_api_project.
deidentifyTemplateName De-Identifikationsvorlage für Cloud DLP nach dem Muster projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} zur Verwendung für API-Anfragen. Beispiel: projects/my_project/deidentifyTemplates/100.
datasetName BigQuery-Dataset zum Senden von tokenisierten Ergebnissen.
batchSize Block-/Batchgröße zum Senden von Daten zur Prüfung und/oder Aufhebung der Tokenisierung. Im Fall einer CSV-Datei gibt batchSize die Anzahl der Zeilen in einem Batch an. Nutzer müssen die Batchgröße anhand der Größe der Datensätze und der Größe der Datei bestimmen. Für die Cloud DLP API gilt eine maximale Nutzlastgröße von 524 KB pro API-Aufruf.
inspectTemplateName [Optional] Cloud DLP-Inspektionsvorlage nach dem Muster projects/{template_project_id}/identifyTemplates/{idTemplateId} zur Verwendung für API-Anfragen. Beispiel: projects/my_project/identifyTemplates/100

Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Dabei gilt:

  • TEMPLATE_PROJECT_ID ist Ihre Projekt-ID der Vorlage
  • DLP_API_PROJECT_ID ist Ihre Cloud DLP API-Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_DATA ist der Pfad zur Eingabedatei
  • DEIDENTIFY_TEMPLATE ist die Nummer der Cloud DLP-De-Identifikationsvorlage
  • DATASET_NAME: der Name des BigQuery-Datasets
  • INSPECT_TEMPLATE_NUMBER ist die Nummer der Cloud DLP-Prüfungsvorlage
  • BATCH_SIZE_VALUE ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • TEMPLATE_PROJECT_ID ist Ihre Projekt-ID der Vorlage
  • DLP_API_PROJECT_ID ist Ihre Cloud DLP API-Projekt-ID
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION: der regionale Endpunkt (z. B. us-west1)
  • TEMP_LOCATION: der Speicherort zum Schreiben temporärer Dateien (z. B. gs://your-bucket/temp)
  • INPUT_DATA ist der Pfad zur Eingabedatei
  • DEIDENTIFY_TEMPLATE ist die Nummer der Cloud DLP-De-Identifikationsvorlage
  • DATASET_NAME: der Name des BigQuery-Datasets
  • INSPECT_TEMPLATE_NUMBER ist die Nummer der Cloud DLP-Prüfungsvorlage
  • BATCH_SIZE_VALUE ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)

Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.

Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.

Voraussetzungen für diese Pipeline:

  • Der Debezium-Connector muss bereitgestellt sein.
  • Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein

Vorlagenparameter

Parameter Beschreibung
inputSubscriptions Die durch Kommas getrennte Liste mit Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format <subscription>,<subscription>, ...
changeLogDataset Das BigQuery-Dataset zum Speichern der Staging-Tabellen im Format <my-dataset>.
replicaDataset Der Speicherort des BigQuery-Datasets zum Speichern der Replikattabellen im Format <my-dataset>.
updateFrequencySecs (Optional) Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.

Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)" ausführen

Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:

  1. Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
  2. Wechseln Sie zum Verzeichnis v2/cdc-parent.
  3. Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
  4. Führen Sie mit Maven die Dataflow-Vorlage aus.

    Ersetzen Sie die folgenden Werte:

    • PROJECT_ID: Ihre Projekt-ID.
    • YOUR_SUBSCRIPTIONS: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.
    • YOUR_CHANGELOG_DATASET: Ihr BigQuery-Dataset für Änderungslogdaten.
    • YOUR_REPLICA_DATASET: Ihr BigQuery-Dataset für Replikattabellen.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka für BigQuery

Die Vorlage „Apache Kafka für BigQuery“ ist eine Streamingpipeline, die Textdaten aus Apache Kafka schreibt, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und die resultierenden Datensätze in BigQuery ausgibt. Alle Fehler, die bei der Transformation der Daten, der Ausführung der UDF oder beim Schreiben in die Ausgabetabelle auftreten, werden in eine separate Fehlertabelle in BigQuery geschrieben. Wenn die Fehlertabelle vor der Ausführung nicht vorhanden ist, wird sie erstellt.

Voraussetzungen für diese Pipeline

  • Die BigQuery-Ausgabetabelle muss vorhanden sein.
  • Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
  • Die Apache Kafka-Themen müssen vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.

Vorlagenparameter

Parameter Beschreibung
outputTableSpec Der Speicherort der BigQuery-Ausgabetabelle, in den die Apache Kafka-Nachrichten geschrieben werden sollen, im Format my-project:dataset.table.
inputTopics Die Apache Kafka-Eingabethemen, aus denen eine durch Kommas getrennte Liste gelesen werden soll. Beispiel: messages
bootstrapServers Die Hostadresse der ausgeführten Apache Kafka-Broker-Server in einer durch Kommas getrennten Liste, jede Hostadresse im Format 35.70.252.199:9092.
javascriptTextTransformGcsPath (Optional) Cloud Storage-Speicherortpfad zur JavaScript-UDF. Beispiel: gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (Optional) Der Name des JavaScript, das als UDF aufgerufen werden soll. Beispiel: transform
outputDeadletterTable (Optional) Die BigQuery-Tabelle im Format my-project:dataset.my-deadletter-table für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird die Tabelle während der Pipelineausführung erstellt. Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet.

Vorlage „Apache Kafka für BigQuery“ ausführen

KONSOLE

Über die Google Cloud Console ausführen
  1. Rufen Sie in der Cloud Console die Seite "Dataflow" auf.
  2. Zur Seite "Dataflow"
  3. Klicken Sie auf Job aus Vorlage erstellen.
  4. Grafik: Schaltfläche &quot;Job aus Vorlage erstellen&quot; in der Cloud Platform Console
  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Apache Kafka to BigQuery template aus.
  6. Geben Sie einen Jobnamen in das Feld Jobname ein.
  7. Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
  8. Klicken Sie auf Job ausführen.

GCLOUD

Über das gcloud-Befehlszeilentool ausführen

Hinweis: Wenn Sie Vorlagen mit dem gcloud-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Dabei gilt:

  • YOUR_PROJECT_ID ist Ihre Projekt-ID der Vorlage
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • REGION_NAME ist der Name der Dataflow-Region (z. B. us-central1)
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält
  • YOUR_JAVASCRIPT_FUNCTION: der Name Ihrer UDF
  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

Über die REST API ausführen

Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.

Dabei gilt:

  • YOUR_PROJECT_ID ist Ihre Projekt-ID der Vorlage
  • JOB_NAME ist ein Jobname Ihrer Wahl
  • LOCATION ist der Name der Dataflow-Region (z. B. us-central1)
  • BIGQUERY_TABLE: Ihr BigQuery-Tabellenname
  • KAFKA_TOPICS ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.
  • PATH_TO_JAVASCRIPT_UDF_FILE: der Cloud Storage-Pfad zur .js-Datei, die Ihren JavaScript-Code enthält
  • YOUR_JAVASCRIPT_FUNCTION: der Name Ihrer UDF
  • KAFKA_SERVER_ADDRESSES ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel: 35.70.252.199:9092 Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}