Google bietet eine Reihe von Open-Source-Vorlagen für Dataflow. Allgemeine Informationen zu Vorlagen finden Sie auf der Seite Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie auf der Seite Erste Schritte mit von Google bereitgestellten Vorlagen.
Auf dieser Seite werden Streamingvorlagen beschrieben:
- Pub/Sub-Abo für BigQuery
- Pub/Sub-Thema für BigQuery
- Pub/Sub Avro für BigQuery
- Pub/Sub zu Pub/Sub
- Pub/Sub zu Splunk
- Pub/Sub für Avro-Dateien in Cloud Storage
- Pub/Sub für Textdateien in Cloud Storage
- Pub/Sub für MongoDB
- Textdateien in Cloud Storage für BigQuery (Stream)
- Textdateien in Cloud Storage für Pub/Sub (Stream)
- Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)
- Change Data Capture für BigQuery (Stream)
- Apache Kafka für BigQuery
Pub/Sub-Abo für BigQuery
Die Vorlage "Pub/Sub-Abo für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Abo liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.
Voraussetzungen für diese Pipeline:
- Die Pub/Sub-Nachrichten müssen im JSON-Format vorliegen, wie hier beschrieben.
Beispielsweise können Nachrichten, die als
{"k1":"v1", "k2":"v2"}
formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namensk1
undk2
mit dem Datentyp "String" eingefügt werden. - Die Ausgabetabelle muss vorhanden sein, bevor Sie die Pipeline ausführen.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputSubscription |
Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll, im Format projects/<project>/subscriptions/<subscription> . |
outputTableSpec |
Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table> . |
outputDeadletterTable |
Die BigQuery-Tabelle im Format <my-project>:<my-dataset>.<my-table> für Nachrichten, die die Ausgabetabelle nicht erreicht haben.
Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt.
Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet. |
Pub/Sub-Abo für BigQuery-Vorlage ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Subscription to BigQuery template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
: ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)SUBSCRIPTION_NAME
: der Name Ihres Pub/Sub-AbosDATASET
: Ihr BigQuery-DatasetTABLE_NAME
: Ihr BigQuery-Tabellenname
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
: ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)SUBSCRIPTION_NAME
: der Name Ihres Pub/Sub-AbosDATASET
: Ihr BigQuery-DatasetTABLE_NAME
: Ihr BigQuery-Tabellenname
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
Pub/Sub-Thema für BigQuery
Die Vorlage "Pub/Sub-Thema für BigQuery" ist eine Streamingpipeline, die Nachrichten im JSON-Format aus einem Pub/Sub-Thema liest und in eine BigQuery-Tabelle schreibt. Sie können die Vorlage als schnelle Lösung verwenden, um Pub/Sub-Daten nach BigQuery zu verschieben. Die Vorlage liest Nachrichten im JSON-Format aus Pub/Sub und konvertiert sie in BigQuery-Elemente.
Voraussetzungen für diese Pipeline:
- Die Pub/Sub-Nachrichten müssen im JSON-Format vorliegen, wie hier beschrieben.
Beispielsweise können Nachrichten, die als
{"k1":"v1", "k2":"v2"}
formatiert sind, in eine BigQuery-Tabelle mit zwei Spalten namensk1
undk2
mit dem Datentyp "String" eingefügt werden. - Die Ausgabetabelle muss vor der Pipelineausführung vorhanden sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputTopic |
Das Pub/Sub-Eingabethema, aus dem gelesen werden soll, im Format projects/<project>/topics/<topic> . |
outputTableSpec |
Der Speicherort der BigQuery-Ausgabetabelle im Format <my-project>:<my-dataset>.<my-table> . |
outputDeadletterTable |
Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Sie sollte das Format <my-project>:<my-dataset>.<my-table> haben.
Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt.
Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet. |
Vorlage "Pub/Sub-Thema für BigQuery" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Topic to BigQuery template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasDATASET
: Ihr BigQuery-DatasetTABLE_NAME
: Ihr BigQuery-Tabellenname
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/PubSub_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasDATASET
: Ihr BigQuery-DatasetTABLE_NAME
: Ihr BigQuery-Tabellenname
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
Pub/Sub Avro für BigQuery
Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.
Voraussetzungen für diese Pipeline
- Das Pub/Sub-Eingabeabo muss vorhanden sein.
- Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
- Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
- Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
schemaPath |
Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel: gs://path/to/my/schema.avsc |
inputSubscription |
Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. z. B. projects/<project>/subscriptions/<subscription> . |
outputTopic |
Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. z. B. projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
Ort der BigQuery-Ausgabetabelle. Beispiel: <my-project>:<my-dataset>.<my-table>
Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden. |
writeDisposition |
(Optional) Die BigQuery-WriteDisposition.
Beispiel: WRITE_APPEND , WRITE_EMPTY oder WRITE_TRUNCATE . Standardeinstellung: WRITE_APPEND |
createDisposition |
(Optional) Die BigQuery-CreateDisposition.
Beispiele: CREATE_IF_NEEDED , CREATE_NEVER Standardeinstellung: CREATE_IF_NEEDED |
Vorlage „Pub/Sub Avro für BigQuery“ ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub Avro to BigQuery template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
Dabei gilt:
JOB_NAME
ist ein Jobname Ihrer WahlREGION_NAME
: der Name der Dataflow-Region (z. B.us-central1
)SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleDEADLETTER_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
JOB_NAME
ist ein Jobname Ihrer WahlLOCATION
: der Name der Dataflow-Region (z. B.us-central1
)SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleDEADLETTER_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Pub/Sub zu Pub/Sub
Die Vorlage "Pub/Sub für Pub/Sub" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und in ein anderes Pub/Sub-Thema schreibt. Die Pipeline akzeptiert auch einen optionalen Nachrichtenattributschlüssel und einen Wert, die zum Filtern der Nachrichten verwendet werden können, die in das Pub/Sub-Thema geschrieben werden sollen. Sie können diese Vorlage verwenden, um Nachrichten mit einem optionalen Nachrichtenfilter von einem Pub/Sub-Abo in ein anderes Pub/Sub-Thema zu kopieren.
Voraussetzungen für diese Pipeline:
- Das als Quelle dienende Pub/Sub-Abo muss vor der Ausführung vorhanden sein.
- Das Pub/Sub-Thema, in das geschrieben werden soll, muss vor der Ausführung vorhanden sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputSubscription |
Pub/Sub-Abo, aus dem die Eingabe gelesen wird. Beispiel: projects/<project-id>/subscriptions/<subscription-name> . |
outputTopic |
Cloud Pub/Sub-Thema, in das die Ausgabe geschrieben wird. Beispiel: projects/<project-id>/topics/<topic-name> . |
filterKey |
[Optional] Filterereignisse nach Attributschlüssel. Wenn filterKey nicht festgelegt ist, werden keine Filter angewendet. |
filterValue |
[Optional] Filterattributwert für den Fall, dass "filterKey" bereitgestellt wird. Standardmäßig ist für filterValue null festgelegt. |
Vorlage "Pub/Sub für Pub/Sub" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Pub/Sub template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-AbosTOPIC_NAME
: der Name des Pub/Sub-ThemasFILTER_KEY
: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.FILTER_VALUE
: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-AbosTOPIC_NAME
: der Name des Pub/Sub-ThemasFILTER_KEY
: der Attributschlüssel, nach dem Ereignisse gefiltert werden Wenn kein Schlüssel angegeben ist, werden keine Filter angewendet.FILTER_VALUE
: Filterattributwert, der verwendet wird, wenn ein Ereignisfilterschlüssel angegeben ist. Akzeptiert einen gültigen Java-Regex-String als Ereignisfilterwert. Wenn ein Regex angegeben wird, muss der komplette Ausdruck übereinstimmen, damit die Nachricht gefiltert wird. Teilübereinstimmungen (z. B. Teilstrings) werden nicht gefiltert. Standardmäßig wird ein Null-Ereignisfilterwert verwendet.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
Pub/Sub zu Splunk
Die Vorlage "Pub/Sub für Splunk" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und die Nutzlast der Nachricht über den HTTP Event Collector (HEC) von Splunk in Splunk schreibt. Vor dem Schreiben in Splunk können Sie auch eine benutzerdefinierte JavaScript-Funktion auf die Nachrichtennutzlast anwenden. Alle Nachrichten, bei denen Verarbeitungsfehler auftreten, werden zur weiteren Fehlerbehebung und erneuten Verarbeitung an ein Pub/Sub-Thema für nicht verarbeitete Datensätze weitergeleitet.
Als zusätzlichen Schutz für Ihr HEC-Token können Sie auch einen Cloud KMS-Schlüssel zusammen mit dem base64-codierten HEC-Tokenparameter übergeben, der mit dem Cloud KMS-Schlüssel verschlüsselt ist. Weitere Informationen zum Verschlüsseln des HEC-Tokenparameters finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.
Voraussetzungen für diese Pipeline:
- Das als Quelle dienende Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein, bevor die Pipeline ausgeführt wird.
- Auf den Splunk-HEC-Endpunkt muss über das Dataflow-Worker-Netzwerk zugegriffen werden können.
- Das Splunk-HEC-Token muss generiert und verfügbar sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputSubscription |
Das Pub/Sub-Abo, aus dem die Eingabe gelesen wird. Beispiel: projects/<project-id>/subscriptions/<subscription-name> . |
token |
Das Splunk-HEC-Authentifizierungstoken. Dieser base64-codierte String kann zur zusätzlichen Sicherheit mit einem Cloud KMS-Schlüssel verschlüsselt werden. |
url |
Die Splunk-HEC-URL. Diese muss von der VPC, in der die Pipeline ausgeführt wird, weitergeleitet werden können. Beispiel: https://splunk-hec-host:8088. |
outputDeadletterTopic |
Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. Beispiel: projects/<project-id>/topics/<topic-name> . |
javascriptTextTransformGcsPath |
[Optional] Der Cloud Storage-Pfad, der Ihren gesamten JavaScript-Code enthält. Beispiel: gs://mybucket/mytransforms/*.js . |
javascriptTextTransformFunctionName |
[Optional] Der Name der aufzurufenden JavaScript-Funktion. Wenn Ihre JavaScript-Funktion beispielsweise function myTransform(inJson) { ...dostuff...} ist, lautet der Funktionsname myTransform .
|
batchCount |
[Optional] Die Batchgröße zum Senden mehrerer Ereignisse an Splunk. Standardeinstellung: 1 (keine Batchverarbeitung). |
parallelism |
[Optional] Die maximale Anzahl an parallelen Anfragen. Standardeinstellung: 1 (keine Parallelität). |
disableCertificateValidation |
[Optional] SSL-Zertifikatsvalidierung deaktivieren. Standardeinstellung: "false" (Validierung aktiviert). |
includePubsubMessage |
[Optional] Schließen Sie die vollständige Pub/Sub-Nachricht in die Nutzlast ein. Standardeinstellung: "false" (nur das Datenelement ist in der Nutzlast enthalten). |
tokenKMSEncryptionKey |
[Optional] Der Cloud KMS-Schlüssel zum Entschlüsseln des HEC-Tokenstrings. Wenn der Cloud KMS-Schlüssel bereitgestellt wird, muss der HEC-Tokenstring verschlüsselt übergeben werden. |
Vorlage "Pub/Sub für Splunk" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Splunk template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: der Name des Pub/Sub-AbosTOKEN
: das HTTP Event Collector-Token von SplunkURL
: der URL-Pfad für den HTTP Event Collector von Splunk (z. B.https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: der Name des Pub/Sub-ThemasJAVASCRIPT_FUNCTION
: der Name Ihrer JavaScript-FunktionPATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthält (z. B. gs://your-bucket/your-function.js)BATCH_COUNT
: die Batchgröße zum Senden mehrerer Ereignisse an SplunkPARALLELISM
: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollenDISABLE_VALIDATION
:true
, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
: der Name des Pub/Sub-AbosTOKEN
: das HTTP Event Collector-Token von SplunkURL
: der URL-Pfad für den HTTP Event Collector von Splunk (z. B.https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
: der Name des Pub/Sub-ThemasJAVASCRIPT_FUNCTION
: der Name Ihrer JavaScript-FunktionPATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthält (z. B. gs://your-bucket/your-function.js)BATCH_COUNT
: die Batchgröße zum Senden mehrerer Ereignisse an SplunkPARALLELISM
: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollenDISABLE_VALIDATION
:true
, wenn Sie die SSL-Zertifikatsvalidierung deaktivieren möchten
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION" } }
Pub/Sub für Avro-Dateien in Cloud Storage
Die Vorlage „Pub/Sub zu Avro-Dateien in Cloud Storage“ ist eine Streamingpipeline, die Daten aus einem Pub/Sub-Thema liest und Avro-Dateien in einen angegebenen Cloud Storage-Bucket schreibt.
Voraussetzungen für diese Pipeline:
- Das Pub/Sub-Eingabethema muss vor der Ausführung der Pipeline vorhanden sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputTopic |
Cloud Pub/Sub-Thema, das zur Nachrichtenaufnahme abonniert werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben. |
outputDirectory |
Ausgabeverzeichnis, in dem die Avro-Dateien archiviert werden. Fügen Sie am Ende einen Schrägstrich (/) hinzu. Beispiel: gs://example-bucket/example-directory/ . |
avroTempDirectory |
Verzeichnis für temporäre Avro-Dateien. Am Ende muss ein Schrägstrich (/) hinzugefügt werden. Beispiel: gs://example-bucket/example-directory/ . |
outputFilenamePrefix |
[Optional] Präfix für den Ausgabedateinamen der Avro-Dateien. |
outputFilenameSuffix |
[Optional] Ausgabedateiname-Suffix für die Avro-Dateien. |
outputShardTemplate |
[Optional] Shard-Vorlage der Ausgabedatei. Wird als sich wiederholende Folge der Buchstaben "S" und "N" angegeben (Beispiel: SSS-NNN). Diese werden durch die Shard-Nummer bzw. die Anzahl der Shards ersetzt. Das Standardvorlagenformat ist 'WP-SS-of-NN', wenn dieser Parameter nicht angegeben ist. |
numShards |
[Optional] Die maximale Anzahl von Ausgabe-Shards, die beim Schreiben erzeugt werden. Die maximale Standardanzahl der Shards beträgt 1. |
Vorlage "Pub/Sub für Cloud Storage Avro" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Cloud Storage Avro template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
: Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name des Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILENAME_PREFIX
: das gewünschte Präfix des AusgabedateinamensFILENAME_SUFFIX
: das gewünschte Suffix des AusgabedateinamensSHARD_TEMPLATE
: die gewünschte Shard-AusgabevorlageNUM_SHARDS
: die Anzahl der Ausgabe-Shards
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ numShards=NUM_SHARDS,\ avroTempDirectory=gs://BUCKET_NAME/temp/
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
: Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name des Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILENAME_PREFIX
: das gewünschte Präfix des AusgabedateinamensFILENAME_SUFFIX
: das gewünschte Suffix des AusgabedateinamensSHARD_TEMPLATE
: die gewünschte Shard-AusgabevorlageNUM_SHARDS
: die Anzahl der Ausgabe-Shards
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE", "numShards": "NUM_SHARDS", } }
Pub/Sub für Textdateien in Cloud Storage
Die Vorlage "Pub/Sub für Cloud Storage Text" ist eine Streamingpipeline, die Datensätze aus Pub/Sub liest und als eine Reihe von Cloud Storage-Dateien im Textformat speichert. Die Vorlage kann als schnelle Möglichkeit zum Speichern von Daten in Pub/Sub zur späteren Verwendung genutzt werden. Standardmäßig erstellt die Vorlage alle fünf Minuten eine neue Datei.
Voraussetzungen für diese Pipeline:
- Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
- Die im Thema veröffentlichten Nachrichten müssen im Textformat vorliegen.
- Die im Thema veröffentlichten Nachrichten dürfen keine Zeilenumbrüche enthalten. Beachten Sie, dass jede Pub/Sub-Nachricht in der Ausgabedatei als einzelne Zeile gespeichert wird.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputTopic |
Das Pub/Sub-Thema, aus dem die Eingabe gelesen werden soll. Der Themenname muss das Format projects/<project-id>/topics/<topic-name> haben. |
outputDirectory |
Das Pfad- und Dateinamenpräfix zum Schreiben von Ausgabedateien. Beispiel: gs://bucket-name/path/ . Dieser Wert muss mit einem Schrägstrich enden. |
outputFilenamePrefix |
Das Präfix für die Namen der einzelnen Dateien im Fenstermodus. Beispiel: output- |
outputFilenameSuffix |
Das Suffix für die Namen der einzelnen Dateien im Fenstermodus, normalerweise eine Dateiendung wie .txt oder .csv . |
outputShardTemplate |
Die Shard-Vorlage definiert den dynamischen Teil aller Namen der Dateien im Fenstermodus. Standardmäßig verwendet die Pipeline einen einzelnen Shard für die Ausgabe in das Dateisystem in jedem Fenster. Das bedeutet, dass alle Daten in einer einzelnen Datei pro Fenster enthalten sind. Für outputShardTemplate wird standardmäßig W-P-SS-of-NN verwendet. Dabei ist W der Datumsbereich des Fensters, P die Bereichsinformation, S die Shard-Nummer und N die Anzahl der Shards. Bei einer einzelnen Datei ist der Abschnitt SS-of-NN der outputShardTemplate immer 00-of-01 .
|
Vorlage „Pub/Sub für Textdateien in Cloud Storage“ ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Pub/Sub to Text Files on Cloud Storage template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-Buckets
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-Buckets
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
Pub/Sub für MongoDB
Die Vorlage „Pub/Sub für MongoDB“ ist eine Streamingpipeline, die JSON-codierte Nachrichten aus einem Pub/Sub-Abo liest und in MongoDB als Dokumente schreibt. Bei Bedarf unterstützt diese Pipeline zusätzliche Transformationen, die über eine benutzerdefinierte JavaScript-Funktion (UDF) eingebunden werden können. Alle Fehler sind aufgrund von nicht übereinstimmenden Schemata, nicht korrekt formatiertem JSON oder während der Ausführung von Transformationen in einer BigQuery-Tabelle für nicht verarbeitete Nachrichten zusammen mit der Eingabenachricht aufgetreten. Falls noch keine Tabelle für nicht verarbeitete Datensätze vorhanden ist, wird diese Tabelle von der Pipeline automatisch erstellt.
Voraussetzungen für diese Pipeline:
- Das Pub/Sub-Abo muss vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
- Der MongoDB-Cluster muss vorhanden und über die Dataflow-Worker-Maschinen zugänglich sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputSubscription |
Name des Pub/Sub-Abos. Beispiel:
|
mongoDBUri |
Durch Kommas getrennte Liste von MongoDB-Servern. Beispiel: 192.285.234.12:27017,192.287.123.11:27017 |
database |
Datenbank in MongoDB zum Speichern der Sammlung. Beispiel: my-db . |
collection |
Name der Sammlung in der MongoDB-Datenbank. Beispiel: my-collection . |
deadletterTable |
BigQuery-Tabelle, die aus Fehlern resultierende Nachrichten speichert (nicht übereinstimmendes Schema, fehlerhaft formatierte JSON-Dateien usw.). Beispiel: project-id:dataset-name.table-name . |
javascriptTextTransformGcsPath |
[Optional] Cloud Storage-Speicherort der JavaScript-Datei mit der UDF-Transformation. Beispiel: gs://mybucket/filename.json . |
javascriptTextTransformFunctionName |
[Optional] Name der JavaScript-UDF. Beispiel: transform |
batchSize |
[Optional] Batchgröße für die Aufnahme von Dokumentenbatches in MongoDB. Standardeinstellung: 1000 . |
batchSizeBytes |
[Optional] Batchgröße in Byte. Standardeinstellung: 5242880 . |
maxConnectionIdleTime |
[Optional] Maximale zulässige Leerlaufzeit in Sekunden, bis eine Zeitüberschreitung der Verbindung auftritt. Standardeinstellung: 60000 . |
sslEnabled |
[Optional] Boolescher Wert, der angibt, ob für die Verbindung zu MongoDB SSL aktiviert ist. Standardeinstellung: true . |
ignoreSSLCertificate |
[Optional] Boolescher Wert, der angibt, ob das SSL-Zertifikat ignoriert werden soll. Standardeinstellung: true . |
withOrdered |
[Optional] Boolescher Wert, mit dem geordnete Bulk-Aufnahmen in MongoDB aktiviert werden. Standardeinstellung: true . |
withSSLInvalidHostNameAllowed |
[Optional] Boolescher Wert, der angibt, ob ein ungültiger Hostname für die SSL-Verbindung zulässig ist. Standardeinstellung: true . |
Vorlage „Pub/Sub für MongoDB“ ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage Pub/Sub to MongoDB template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführen
Hinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDREGION_NAME
: Name der Dataflow-Region, (z. B.us-central1
)JOB_NAME
ist ein Jobname Ihrer WahlINPUT_SUBSCRIPTION
: das Pub/Sub-Abo (z. B.
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: die MongoDB-Serveradressen (z. B.192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: der Name der MongoDB-Datenbank (z. B.users
)COLLECTION
: der Name der MongoDB-Sammlung (z. B.profiles
)UNPROCESSED_TABLE
: der Name der BigQuery-Tabelle (z. B.your-project:your-dataset.your-table-name
)
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDLOCATION
: Name der Dataflow-Region, (z. B.us-central1
)JOB_NAME
ist ein Jobname Ihrer WahlINPUT_SUBSCRIPTION
: das Pub/Sub-Abo (z. B.
)projects/<project-id>/subscriptions/<subscription-name>
MONGODB_URI
: die MongoDB-Serveradressen (z. B.192.285.234.12:27017,192.287.123.11:27017
)DATABASE
: der Name der MongoDB-Datenbank (z. B.users
)COLLECTION
: der Name der MongoDB-Sammlung (z. B.profiles
)UNPROCESSED_TABLE
: der Name der BigQuery-Tabelle (z. B.your-project:your-dataset.your-table-name
)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB", } }
Textdateien in Cloud Storage für BigQuery (Stream)
Die Pipeline „Textdateien in Cloud Storage für BigQuery“ ist eine Streamingpipeline, mit der Sie in Cloud Storage gespeicherte Textdateien streamen, diese mit einer von Ihnen bereitgestellten benutzerdefinierten JavaScript-Funktion (User Defined Function, UDF) transformieren und das Ergebnis in BigQuery ausgeben können.
Voraussetzungen für diese Pipeline:
- Erstellen Sie eine JSON-formatierte BigQuery-Schemadatei, die Ihre Ausgabetabelle beschreibt.
{ 'fields': [{ 'name': 'location', 'type': 'STRING' }, { 'name': 'name', 'type': 'STRING' }, { 'name': 'age', 'type': 'STRING', }, { 'name': 'color', 'type': 'STRING' }, { 'name': 'coffee', 'type': 'STRING', 'mode': 'REQUIRED' }, { 'name': 'cost', 'type': 'NUMERIC', 'mode': 'REQUIRED' }] }
- Erstellen Sie eine JavaScript-Datei (
.js
) mit Ihrer benutzerdefinierten Funktion, die die Logik für die Transformation der Textzeilen bereitstellt. Beachten Sie, dass Ihre Funktion einen JSON-String zurückgeben muss.Diese Funktion teilt beispielsweise jede Zeile einer CSV-Datei auf und gibt nach der Transformation der Werte einen JSON-String zurück.
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
Vorlagenparameter
Parameter | Beschreibung |
---|---|
javascriptTextTransformGcsPath |
Der Cloud Storage-Speicherort Ihrer benutzerdefinierten JavaScript-Funktion.
Beispiel: gs://my_bucket/my_function.js . |
JSONPath |
Der Cloud Storage-Speicherort Ihrer BigQuery-Schemadatei im JSON-Format.
Beispiel: gs://path/to/my/schema.json |
javascriptTextTransformFunctionName |
Der Name der JavaScript-Funktion, die Sie als benutzerdefinierte Funktion aufrufen möchten.
Beispiel: transform |
outputTable |
Die vollständig qualifizierte BigQuery-Tabelle.
Beispiel: my-project:dataset.table . |
inputFilePattern |
Der Cloud Storage-Speicherort des Textes, den Sie verarbeiten möchten.
Beispiel: gs://my-bucket/my-files/text.txt |
bigQueryLoadingTemporaryDirectory |
Das temporäre Verzeichnis für den BigQuery-Ladevorgang.
Beispiel: gs://my-bucket/my-files/temp_dir |
outputDeadletterTable |
Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben.
Beispiel: my-project:dataset.my-unprocessed-table . Wenn sie nicht vorhanden ist, wird sie während der Pipelineausführung erstellt.
Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet. |
Vorlage "Cloud Storage Text für BigQuery (Stream)" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Cloud Storage Text to BigQuery template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: der Name Ihrer UDFPATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthältPATH_TO_TEXT_DATA
: der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennameBIGQUERY_UNPROCESSED_TABLE
: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete NachrichtenPATH_TO_TEMP_DIR_ON_GCS
: der Cloud Storage-Pfad zum temporären Verzeichnis
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)JAVASCRIPT_FUNCTION
: der Name Ihrer UDFPATH_TO_BIGQUERY_SCHEMA_JSON
: der Cloud Storage-Pfad zur JSON-Datei, die die Schemadefinition enthältPATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthältPATH_TO_TEXT_DATA
: der Cloud Storage-Pfad zu Ihrem Text-DatasetBIGQUERY_TABLE
: Ihr BigQuery-TabellennameBIGQUERY_UNPROCESSED_TABLE
: der Name Ihrer BigQuery-Tabelle für nicht verarbeitete NachrichtenPATH_TO_TEMP_DIR_ON_GCS
: der Cloud Storage-Pfad zum temporären Verzeichnis
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" } }
Textdateien in Cloud Storage für Pub/Sub (Stream)
Diese Vorlage erstellt eine Streamingpipeline, die kontinuierlich nach neuen Textdateien sucht, die in Cloud Storage hochgeladen wurden, jede Datei Zeile für Zeile liest und Strings in einem Pub/Sub-Thema veröffentlicht. Die Vorlage veröffentlicht Datensätze aus einer JSON-Datei mit Zeilenumbruch oder einer CSV-Datei zur Echtzeitverarbeitung in einem Pub/Sub-Thema. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.
Derzeit ist das Abfrageintervall fest auf zehn Sekunden festgelegt. Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest, sodass die Ereigniszeit der Veröffentlichungszeit während der Ausführung entspricht. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, sollten Sie diese Pipeline nicht verwenden.
Voraussetzungen für diese Pipeline:
- Die Eingabedateien müssen im JSON-Format mit Zeilenumbruch oder im CSV-Format vorliegen. Datensätze, die sich in den Quelldateien über mehrere Zeilen erstrecken, können später Probleme verursachen, da jede Zeile in den Dateien als eigene Nachricht in Pub/Sub veröffentlicht wird.
- Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
- Die Pipeline wird unbefristet ausgeführt und muss manuell beendet werden.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFilePattern |
Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel: gs://bucket-name/files/*.json oder gs://bucket-name/path/*.csv . |
outputTopic |
Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format projects/<project-id>/topics/<topic-name> haben. |
Vorlage „Textdateien in Cloud Storage für Pub/Sub (Stream)” ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Text Files on Cloud Storage to Pub/Sub (Stream) template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILE_PATTERN
: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B.path/*.csv
)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION\ --staging-location TEMP_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
PROJECT_ID
: Ihre Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)TOPIC_NAME
: der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILE_PATTERN
: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B.path/*.csv
)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)
Die Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ist eine Streamingpipeline, die CSV-Dateien aus einem Cloud Storage-Bucket liest und die Cloud Data Loss Prevention API (Cloud DLP) für die De- Identifikation aufruft und die de-identifizierten Daten in die angegebene BigQuery-Tabelle schreibt. Diese Vorlage unterstützt sowohl die Verwendung einer Inspektionsvorlage als auch einer De-Identifikationsvorlage für Cloud DLP. Auf diese Weise können Nutzer nach potenziell vertraulichen Informationen suchen und deren Identifizierung aufheben sowie die Identifizierung von strukturierten Daten aufheben, in denen Spalten für die De-Identifikation angegeben sind und kein Prüfung erforderlich ist.
Voraussetzungen für diese Pipeline:
- Die Eingabedaten für die Tokenisierung müssen vorhanden sein.
- Die Cloud DLP-Vorlagen müssen vorhanden sein, zum Beispiel "DeidentifyTemplate" und "InspectTemplate". Weitere Informationen finden Sie unter Cloud DLP-Vorlagen.
- Das BigQuery-Dataset muss vorhanden sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFilePattern |
Die CSV-Dateien, aus denen Eingabedatensätze gelesen werden sollen. Platzhalter werden ebenfalls akzeptiert. Beispiel: gs://mybucket/my_csv_filename.csv oder gs://mybucket/file-*.csv .
|
dlpProjectId |
ID des Cloud DLP-Projekts, das Inhaber der Cloud DLP API-Ressource ist.
Dieses Cloud DLP-Projekt kann dasselbe Projekt sein, das auch Inhaber der Cloud DLP-Vorlagen ist, oder ein separates Projekt.
Beispiel: my_dlp_api_project . |
deidentifyTemplateName |
De-Identifikationsvorlage für Cloud DLP nach dem Muster projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} zur Verwendung für API-Anfragen.
Beispiel: projects/my_project/deidentifyTemplates/100 . |
datasetName |
BigQuery-Dataset zum Senden von tokenisierten Ergebnissen. |
batchSize |
Block-/Batchgröße zum Senden von Daten zur Prüfung und/oder Aufhebung der Tokenisierung. Im Fall einer CSV-Datei gibt batchSize die Anzahl der Zeilen in einem Batch an. Nutzer müssen die Batchgröße anhand der Größe der Datensätze und der Größe der Datei bestimmen. Für die Cloud DLP API gilt eine maximale Nutzlastgröße von 524 KB pro API-Aufruf. |
inspectTemplateName |
[Optional] Cloud DLP-Inspektionsvorlage nach dem Muster projects/{template_project_id}/identifyTemplates/{idTemplateId} zur Verwendung für API-Anfragen.
Beispiel: projects/my_project/identifyTemplates/100 |
Vorlage "Datenmaskierung/Tokenisierung aus Cloud Storage für BigQuery (mit Cloud DLP)" ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie Cloud SDK 138.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
Dabei gilt:
TEMPLATE_PROJECT_ID
ist Ihre Projekt-ID der VorlageDLP_API_PROJECT_ID
ist Ihre Cloud DLP API-Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)INPUT_DATA
ist der Pfad zur EingabedateiDEIDENTIFY_TEMPLATE
ist die Nummer der Cloud DLP-De-IdentifikationsvorlageDATASET_NAME
: der Name des BigQuery-DatasetsINSPECT_TEMPLATE_NUMBER
ist die Nummer der Cloud DLP-PrüfungsvorlageBATCH_SIZE_VALUE
ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \ --region REGION \ --staging-location TEMP_LOCATION \ --parameters \ inputFilePattern=INPUT_DATA,\ datasetName=DATASET_NAME,\ batchSize=BATCH_SIZE_VALUE,\ dlpProjectId=DLP_API_PROJECT_ID,\ deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\ inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
TEMPLATE_PROJECT_ID
ist Ihre Projekt-ID der VorlageDLP_API_PROJECT_ID
ist Ihre Cloud DLP API-Projekt-IDJOB_NAME
ist ein Jobname Ihrer WahlREGION
: der regionale Endpunkt (z. B.us-west1
)TEMP_LOCATION
: der Speicherort zum Schreiben temporärer Dateien (z. B.gs://your-bucket/temp
)INPUT_DATA
ist der Pfad zur EingabedateiDEIDENTIFY_TEMPLATE
ist die Nummer der Cloud DLP-De-IdentifikationsvorlageDATASET_NAME
: der Name des BigQuery-DatasetsINSPECT_TEMPLATE_NUMBER
ist die Nummer der Cloud DLP-PrüfungsvorlageBATCH_SIZE_VALUE
ist die Batchgröße (Anzahl der Zeilen pro API für CSV-Dateien)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern":INPUT_DATA, "datasetName": "DATASET_NAME", "batchSize": "BATCH_SIZE_VALUE", "dlpProjectId": "DLP_API_PROJECT_ID", "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE", "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER" } }
Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)
Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.
Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.
Voraussetzungen für diese Pipeline:
- Der Debezium-Connector muss bereitgestellt sein.
- Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputSubscriptions |
Die durch Kommas getrennte Liste mit Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format <subscription>,<subscription>, ... |
changeLogDataset |
Das BigQuery-Dataset zum Speichern der Staging-Tabellen im Format <my-dataset> . |
replicaDataset |
Der Speicherort des BigQuery-Datasets zum Speichern der Replikattabellen im Format <my-dataset> . |
updateFrequencySecs |
(Optional) Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert. |
Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)" ausführen
Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:
- Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
- Wechseln Sie zum Verzeichnis
v2/cdc-parent
. - Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
- Führen Sie mit Maven die Dataflow-Vorlage aus.
Ersetzen Sie die folgenden Werte:
PROJECT_ID
: Ihre Projekt-ID.YOUR_SUBSCRIPTIONS
: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.YOUR_CHANGELOG_DATASET
: Ihr BigQuery-Dataset für Änderungslogdaten.YOUR_REPLICA_DATASET
: Ihr BigQuery-Dataset für Replikattabellen.
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=YOUR_SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=YOUR_CHANGELOG_DATASET \ --replicaDataset=YOUR_REPLICA_DATASET \ --project=PROJECT_ID"
Apache Kafka für BigQuery
Die Vorlage „Apache Kafka für BigQuery“ ist eine Streamingpipeline, die Textdaten aus Apache Kafka schreibt, eine benutzerdefinierte Funktion (User-defined Function, UDF) ausführt und die resultierenden Datensätze in BigQuery ausgibt. Alle Fehler, die bei der Transformation der Daten, der Ausführung der UDF oder beim Schreiben in die Ausgabetabelle auftreten, werden in eine separate Fehlertabelle in BigQuery geschrieben. Wenn die Fehlertabelle vor der Ausführung nicht vorhanden ist, wird sie erstellt.
Voraussetzungen für diese Pipeline
- Die BigQuery-Ausgabetabelle muss vorhanden sein.
- Der Apache Kafka-Broker-Server muss ausgeführt werden und über die Dataflow-Worker-Maschinen erreichbar sein.
- Die Apache Kafka-Themen müssen vorhanden sein und die Nachrichten müssen in einem gültigen JSON-Format codiert sein.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
outputTableSpec |
Der Speicherort der BigQuery-Ausgabetabelle, in den die Apache Kafka-Nachrichten geschrieben werden sollen, im Format my-project:dataset.table . |
inputTopics |
Die Apache Kafka-Eingabethemen, aus denen eine durch Kommas getrennte Liste gelesen werden soll. Beispiel: messages |
bootstrapServers |
Die Hostadresse der ausgeführten Apache Kafka-Broker-Server in einer durch Kommas getrennten Liste, jede Hostadresse im Format 35.70.252.199:9092 . |
javascriptTextTransformGcsPath |
(Optional) Cloud Storage-Speicherortpfad zur JavaScript-UDF. Beispiel: gs://my_bucket/my_function.js |
javascriptTextTransformFunctionName |
(Optional) Der Name des JavaScript, das als UDF aufgerufen werden soll. Beispiel: transform |
outputDeadletterTable |
(Optional) Die BigQuery-Tabelle im Format my-project:dataset.my-deadletter-table für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn sie nicht vorhanden ist, wird die Tabelle während der Pipelineausführung erstellt.
Wenn nicht angegeben, wird stattdessen <outputTableSpec>_error_records verwendet. |
Vorlage „Apache Kafka für BigQuery“ ausführen
KONSOLE
Über die Google Cloud Console ausführen- Rufen Sie in der Cloud Console die Seite "Dataflow" auf. Zur Seite "Dataflow"
- Klicken Sie auf Job aus Vorlage erstellen.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage the Apache Kafka to BigQuery template aus.
- Geben Sie einen Jobnamen in das Feld Jobname ein.
- Geben Sie Ihre Parameterwerte in die verfügbaren Parameterfelder ein.
- Klicken Sie auf Job ausführen.

GCLOUD
Über dasgcloud
-Befehlszeilentool ausführenHinweis: Wenn Sie Vorlagen mit dem gcloud
-Befehlszeilentool ausführen möchten, benötigen Sie das Cloud SDK 284.0.0 oder eine höhere Version.
Zum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
Dabei gilt:
YOUR_PROJECT_ID
ist Ihre Projekt-ID der VorlageJOB_NAME
ist ein Jobname Ihrer WahlREGION_NAME
ist der Name der Dataflow-Region (z. B.us-central1
)BIGQUERY_TABLE
: Ihr BigQuery-TabellennameKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.PATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthältYOUR_JAVASCRIPT_FUNCTION
: der Name Ihrer UDFKAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.
gcloud beta dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
API
Über die REST API ausführenZum Ausführen dieser Vorlage benötigen Sie den Cloud Storage-Pfad zur Vorlage:
gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery
Zum Ausführen dieser Vorlage mithilfe einer REST API-Anfrage senden Sie eine HTTP-POST-Anfrage mit Ihrer Projekt-ID. Diese Anfrage erfordert eine Autorisierung.
Dabei gilt:
YOUR_PROJECT_ID
ist Ihre Projekt-ID der VorlageJOB_NAME
ist ein Jobname Ihrer WahlLOCATION
ist der Name der Dataflow-Region (z. B.us-central1
)BIGQUERY_TABLE
: Ihr BigQuery-TabellennameKAFKA_TOPICS
ist die Apache Kafafa-Themenliste. Wenn mehrere Themen bereitgestellt werden, folgen Sie der Anleitung zum Maskieren von Kommas.PATH_TO_JAVASCRIPT_UDF_FILE
: der Cloud Storage-Pfad zur.js
-Datei, die Ihren JavaScript-Code enthältYOUR_JAVASCRIPT_FUNCTION
: der Name Ihrer UDFKAFKA_SERVER_ADDRESSES
ist die IP-Adressliste des Apache Kafka-Brokers. Jede IP-Adresse sollte die Portnummer haben, von der aus der Server zugänglich ist. Beispiel:35.70.252.199:9092
Wenn mehrere Adressen angegeben werden, folgen Sie der Anleitung zum Maskieren von Kommas.
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery", } }