Google bietet eine Reihe von Open-Source-Vorlagen für Cloud Dataflow. Allgemeine Informationen zu Vorlagen finden Sie unter Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie unter Erste Schritte mit von Google bereitgestellten Vorlagen.
In dieser Anleitung werden Hilfsvorlagen dokumentiert.
Dateiformatkonvertierung (Avro, Parquet, CSV)
Die Vorlage zur Dateiformatkonvertierung ist eine Batchpipeline die im Cloud-Speicher gespeicherte Dateien von einem unterstützten Format in ein anderes konvertiert.
Die folgenden Formatkonvertierungen werden unterstützt:
- CSV zu Avro
- CSV zu Parquet
- Avro zu Parquet
- Parquet zu Avro
Voraussetzungen für diese Pipeline:
- Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein, bevor Sie die Pipeline ausführen.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFileFormat |
Das Eingabedateiformat. Dies muss einer der folgenden Werte sein: [csv, avro, parquet] . |
outputFileFormat |
Das Ausgabedateiformat. Dies muss einer der folgenden Werte sein: [avro, parquet] . |
inputFileSpec |
Das Cloud Storage-Pfadmuster für Eingabedateien. Beispiel: gs://bucket-name/path/*.csv |
outputBucket |
Der Cloud Storage-Ordner zum Schreiben von Ausgabedateien. Dieser Pfad muss mit einem Schrägstrich enden.
Beispiel: gs://bucket-name/output/ |
schema |
Der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://bucket-name/schema/my-schema.avsc ) |
containsHeaders |
(Optional) Die CSV-Eingabedateien enthalten eine Kopfzeile (true/false). Der Standardwert ist false . Nur erforderlich, wenn CSV-Dateien gelesen werden. |
csvFormat |
(Optional) Die CSV-Formatspezifikation zum Parsen von Einträgen. Der Standardwert ist Default .
Weitere Informationen finden Sie unter Apache Commons CSV-Format. |
delimiter |
(Optional) Das Feldtrennzeichen, das von den eingegebenen CSV-Dateien verwendet wird. |
outputFilePrefix |
(Optional) Das Präfix der Ausgabedatei. Der Standardwert ist output . |
numShards |
[Optional] Die Anzahl der Shards der Ausgabedatei. |
Vorlage für die Dateiformatkonvertierung ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Convert file formats template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \ --parameters \ inputFileFormat=INPUT_FORMAT,\ outputFileFormat=OUTPUT_FORMAT,\ inputFileSpec=INPUT_FILES,\ schema=SCHEMA,\ outputBucket=OUTPUT_FOLDER
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
INPUT_FORMAT
: Das Dateiformat der Eingabedatei muss eines von[csv, avro, parquet]
seinOUTPUT_FORMAT
: Das Dateiformat der Ausgabedateien muss eines von[avro, parquet]
seinINPUT_FILES
: Das Pfadmuster für EingabedateienOUTPUT_FOLDER
: Ihr Cloud Storage-Ordner für AusgabedateienSCHEMA
: Der Pfad zur Avro-Schemadatei
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileFormat": "INPUT_FORMAT", "outputFileFormat": "OUTPUT_FORMAT", "inputFileSpec": "INPUT_FILES", "schema": "SCHEMA", "outputBucket": "OUTPUT_FOLDER" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion", } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
INPUT_FORMAT
: Das Dateiformat der Eingabedatei muss eines von[csv, avro, parquet]
seinOUTPUT_FORMAT
: Das Dateiformat der Ausgabedateien muss eines von[avro, parquet]
seinINPUT_FILES
: Das Pfadmuster für EingabedateienOUTPUT_FOLDER
: Ihr Cloud Storage-Ordner für AusgabedateienSCHEMA
: Der Pfad zur Avro-Schemadatei
Bulk-Komprimierung von Cloud Storage-Dateien
Die Vorlage "Bulk-Komprimierung von Cloud Storage-Dateien" ist eine Batchpipeline, die Dateien in Cloud Storage an einem festgelegten Speicherort komprimiert. Diese Vorlage kann nützlich sein, wenn Sie große Dateistapel im Rahmen eines periodischen Archivierungsvorgangs komprimieren müssen. Die folgenden Komprimierungsmodi werden unterstützt:
BZIP2
, DEFLATE
, GZIP
.
Dateien, die an den Zielort ausgegeben werden, folgen dem Namensschema des ursprünglichen Dateinamens, an den die Erweiterung des Komprimierungsmodus angehängt wird. Mögliche Erweiterungen sind:
.bzip2
, .deflate
, .gz
.
Voraussetzungen für diese Pipeline:
- Die Komprimierung muss in einem der folgenden Formate erfolgen:
BZIP2
,DEFLATE
oderGZIP
. - Das Ausgabeverzeichnis muss vorhanden sein, damit Sie die Pipeline verwenden können.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFilePattern |
Das Muster der Eingabedatei, aus der gelesen werden soll. z. B. gs://bucket-name/uncompressed/*.txt . |
outputDirectory |
Der Ausgabeort, in den geschrieben werden soll, z. B. gs://bucket-name/compressed/ . |
outputFailureFile |
Die Ausgabedatei des Fehlerlogs für Schreibfehler, die während der Komprimierung auftreten. Beispiel: gs://bucket-name/compressed/failed.csv Wenn keine Fehler auftreten, wird die Datei zwar erstellt, bleibt aber leer. Der Dateiinhalt liegt im CSV-Format (Dateiname, Fehler) vor und besteht aus einer Zeile pro Datei, bei der die Komprimierung fehlschlägt. |
compression |
Der Komprimierungsalgorithmus, der zur Komprimierung der übereinstimmenden Dateien verwendet wird. Es muss sich um einen dieser Algorithmen handeln: BZIP2 , DEFLATE oder GZIP .
|
Vorlage "Bulk-Komprimierung von Cloud Storage-Dateien" ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Compress Files on Cloud Storage template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\ outputDirectory=gs://BUCKET_NAME/compressed,\ outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\ compression=COMPRESSION
Dabei gilt:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
BUCKET_NAME
: der Name Ihres Cloud Storage-BucketsCOMPRESSION
: der ausgewählte Komprimierungsalgorithmus
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt", "outputDirectory": "gs://BUCKET_NAME/compressed", "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv", "compression": "COMPRESSION" }, "environment": { "zone": "us-central1-f" } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
BUCKET_NAME
: der Name Ihres Cloud Storage-BucketsCOMPRESSION
: der ausgewählte Komprimierungsalgorithmus
Bulk-Dekomprimierung von Cloud Storage-Dateien
Die Vorlage "Bulk-Dekomprimierung von Cloud Storage-Dateien" ist eine Batchpipeline, die Dateien in Cloud Storage an einem festgelegten Speicherort dekomprimiert. Diese Funktionalität ist nützlich, wenn Sie komprimierte Daten verwenden möchten, um die Kosten der Netzwerkbandbreiten während einer Migration zu minimieren, aber nach der Migration die analytische Verarbeitungsgeschwindigkeit wieder maximieren möchten, um mit unkomprimierten Daten zu arbeiten. Die Pipeline verarbeitet automatisch mehrere Komprimierungsmodi während einer einzelnen Ausführung und bestimmt den zu verwendenden Dekomprimierungsmodus anhand der Dateierweiterungen .bzip2
, .deflate
, .gz
und .zip
.
Voraussetzungen für diese Pipeline:
- Die zu dekomprimierenden Dateien müssen in einem der folgenden Formate vorliegen:
Bzip2
,Deflate
,Gzip
,Zip
. - Das Ausgabeverzeichnis muss vorhanden sein, damit Sie die Pipeline verwenden können.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
inputFilePattern |
Das Muster der Eingabedatei, aus der gelesen werden soll. z. B. gs://bucket-name/compressed/*.gz . |
outputDirectory |
Der Ausgabeort, in den geschrieben werden soll, z. B. gs://bucket-name/decompressed . |
outputFailureFile |
Die Ausgabedatei des Fehlerlogs für Schreibfehler, die während der Komprimierung auftreten. Beispiel: gs://bucket-name/decompressed/failed.csv Wenn keine Fehler auftreten, wird die Datei zwar erstellt, bleibt aber leer. Der Dateiinhalt liegt im CSV-Format (Dateiname, Fehler) vor und besteht aus einer Zeile pro Datei, bei der die Dekomprimierung fehlschlägt. |
Vorlage "Bulk-Dekomprimierung von Cloud Storage-Dateien" ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Decompress Files on Cloud Storage template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\ outputDirectory=gs://BUCKET_NAME/decompressed,\ outputFailureFile=OUTPUT_FAILURE_FILE_PATH
Dabei gilt:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
BUCKET_NAME
: der Name Ihres Cloud Storage-BucketsOUTPUT_FAILURE_FILE_PATH
: der Pfad Ihrer Datei mit Fehlerinformationen
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz", "outputDirectory": "gs://BUCKET_NAME/decompressed", "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH" }, "environment": { "zone": "us-central1-f" } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
BUCKET_NAME
: der Name Ihres Cloud Storage-BucketsOUTPUT_FAILURE_FILE_PATH
: der Pfad Ihrer Datei mit Fehlerinformationen
Bulk-Löschen in Datastore [verworfen]
Diese Vorlage wurde verworfen und wird in Q1 2022 entfernt. Migrieren Sie zur Vorlage Bulk-Löschen in Firestore.
Die Vorlage "Bulk-Löschen in Datastore" ist eine Pipeline, die Entitäten aus Datastore mit einer gegebenen GQL-Abfrage einliest und anschließend alle übereinstimmenden Entitäten im ausgewählten Zielprojekt löscht. Die Pipeline kann optional die JSON-codierten Datastore-Entitäten an Ihre JavaScript-UDF übergeben, mit der Sie Entitäten durch Zurückgeben von Nullwerten herausfiltern können.
Voraussetzungen für diese Pipeline:
- Datastore muss vor dem Ausführen der Vorlage im Projekt eingerichtet werden.
- Wenn das Dataflow-Worker-Dienstkonto aus separaten Datastore-Instanzen gelesen und gelöscht wird, muss es berechtigt sein, aus einer Instanz zu lesen und aus der anderen Instanz zu löschen.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
datastoreReadGqlQuery |
GQL-Abfrage, die festlegt, welche Entitäten gelöscht werden sollen. Die Verwendung einer ausschließlich schlüsselbasierten Abfrage kann die Leistung verbessern. Beispiel: "SELECT __key__ FROM MyKind". |
datastoreReadProjectId |
Projekt-ID der Datastore-Instanz, aus der (mithilfe Ihrer GQL-Abfrage) Entitäten gelesen werden sollen, die für den Abgleich verwendet werden. |
datastoreDeleteProjectId |
Projekt-ID der Datastore-Instanz, aus der übereinstimmende Entitäten gelöscht werden sollen. Dies kann mit datastoreReadProjectId identisch sein, wenn Sie innerhalb derselben Datastore-Instanz lesen und löschen möchten. |
datastoreReadNamespace |
(Optional) Namespace der angeforderten Entitäten. Als "" für den Standard-Namespace festlegen. |
datastoreHintNumWorkers |
(Optional) Hinweis für die erwartete Anzahl von Workern im Schritt zur Drosselung der Erhöhung in Datastore. Standardwert ist 500 . |
javascriptTextTransformGcsPath |
(Optional)
Der Cloud Storage-URI der Datei .js , in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Optional)
Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten.
Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform . Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
Wenn diese Funktion für eine bestimmte Datastore-Entität den Wert "nicht definiert" oder "Null" zurückgibt, wird diese Entität nicht gelöscht. |
Vorlage "Bulk-Löschen in Datastore" ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Delete Entities in Datastore template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \ --region REGION_NAME \ --parameters \ datastoreReadGqlQuery="GQL_QUERY",\ datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\ datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
Dabei gilt:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
GQL_QUERY
: die Abfrage, mit der Sie Entitäten zum Löschen auswählenDATASTORE_READ_AND_DELETE_PROJECT_ID
: die Projekt-ID Ihrer Datastore-Instanz; in diesem Beispiel werden Lese- und Löschvorgänge in derselben Datastore-Instanz ausgeführt
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete { "jobName": "JOB_NAME", "parameters": { "datastoreReadGqlQuery": "GQL_QUERY", "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID", "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
GQL_QUERY
: die Abfrage, mit der Sie Entitäten zum Löschen auswählenDATASTORE_READ_AND_DELETE_PROJECT_ID
: die Projekt-ID Ihrer Datastore-Instanz; in diesem Beispiel werden Lese- und Löschvorgänge in derselben Datastore-Instanz ausgeführt
Bulk-Löschen in Firestore
Die Vorlage "Bulk-Löschen in Firestore" ist eine Pipeline, die Entitäten aus Firestore mit einer gegebenen GQL-Abfrage einliest und anschließend alle übereinstimmenden Entitäten im ausgewählten Zielprojekt löscht. Die Pipeline kann optional die JSON-codierten Firestore-Entitäten an Ihre JavaScript-UDF übergeben, mit der Sie Entitäten durch Zurückgeben von Nullwerten herausfiltern können.
Voraussetzungen für diese Pipeline:
- Firestore muss vor dem Ausführen der Vorlage im Projekt eingerichtet werden.
- Wenn das Dataflow-Worker-Dienstkonto aus separaten Firestore-Instanzen gelesen und gelöscht wird, muss es berechtigt sein, aus einer Instanz zu lesen und aus der anderen Instanz zu löschen.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
firestoreReadGqlQuery |
GQL-Abfrage, die festlegt, welche Entitäten gelöscht werden sollen. Die Verwendung einer ausschließlich schlüsselbasierten Abfrage kann die Leistung verbessern. Beispiel: "SELECT __key__ FROM MyKind". |
firestoreReadProjectId |
Projekt-ID der Firestore-Instanz, aus der Entitäten gelesen werden sollen (mithilfe Ihrer GQL-Abfrage), die für den Abgleich verwendet werden. |
firestoreDeleteProjectId |
Projekt-ID der Firestore-Instanz, aus der übereinstimmende Entitäten gelöscht werden sollen. Dies kann mit firestoreReadProjectId identisch sein, wenn Sie innerhalb derselben Firestore-Instanz lesen und löschen möchten. |
firestoreReadNamespace |
(Optional) Namespace der angeforderten Entitäten. Als "" für den Standard-Namespace festlegen. |
firestoreHintNumWorkers |
(Optional) Hinweis für die erwartete Anzahl von Workern im Schritt zur Drosselung der Erhöhung in Firestore. Standardwert ist 500 . |
javascriptTextTransformGcsPath |
(Optional)
Der Cloud Storage-URI der Datei .js , in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Optional)
Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten.
Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform . Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.
Wenn diese Funktion für eine bestimmte Firestore-Entität den Wert "nicht definiert" oder "Null" zurückgibt, wird diese Entität nicht gelöscht. |
Vorlage "Bulk-Löschen in Firestore" ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Delete Entities in Firestore template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \ --region REGION_NAME \ --parameters \ firestoreReadGqlQuery="GQL_QUERY",\ firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\ firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID
Dabei gilt:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
GQL_QUERY
: die Abfrage, mit der Sie Entitäten zum Löschen auswählenFIRESTORE_READ_AND_DELETE_PROJECT_ID
: die Projekt-ID Ihrer Firestore-Instanz. In diesem Beispiel werden Lese- und Löschvorgänge in derselben Firestore-Instanz ausgeführt.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete { "jobName": "JOB_NAME", "parameters": { "firestoreReadGqlQuery": "GQL_QUERY", "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID", "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
GQL_QUERY
: die Abfrage, mit der Sie Entitäten zum Löschen auswählenFIRESTORE_READ_AND_DELETE_PROJECT_ID
: die Projekt-ID Ihrer Firestore-Instanz. In diesem Beispiel werden Lese- und Löschvorgänge in derselben Firestore-Instanz ausgeführt.
Streaming Data Generator für Pub/Sub/BigQuery/Cloud Storage
Die Vorlage für Streaming Data Generator wird verwendet, um entweder eine unbegrenzte oder eine feste Anzahl synthetischer Datensätze oder Nachrichten anhand eines vom Nutzer bereitgestellten Schemas mit der angegebenen Rate zu generieren. Zu den kompatiblen Zielen zählen Pub/Sub-Themen, BigQuery-Tabellen und Cloud Storage-Buckets.
Im Folgenden sind einige mögliche Anwendungsfälle aufgeführt:
- Simulieren der Veröffentlichung von Echtzeitereignissen in großem Umfang in einem Pub/Sub-Thema, um die Anzahl und Größe der Nutzer zu ermitteln und zu bestimmen, die zur Verarbeitung veröffentlichter Ereignisse erforderlich sind.
- Generieren synthetischer Daten in einer BigQuery-Tabelle oder in einem Cloud Storage-Bucket, um Leistungs-Benchmarks zu bewerten oder um diese als Proof of Concept zu nutzen.
Unterstützte Senken und Codierungsformate
In der folgenden Tabelle wird gezeigt, welche Senken und Codierungsformate von dieser Vorlage unterstützt werden:JSON | Avro | Parquet | |
---|---|---|---|
Pub/Sub | Ja | Ja | Nein |
BigQuery | Ja | Nein | Nein |
Cloud Storage | Ja | Ja | Ja |
Voraussetzungen für diese Pipeline:
Erstellen Sie eine Schemadatei, die eine JSON-Vorlage für die generierten Daten enthält. Diese Vorlage verwendet die JSON Data Generator-Bibliothek, sodass Sie für jedes Feld im Schema verschiedene Faker-Funktionen bereitstellen können. Weitere Informationen finden Sie in der Dokumentation zu json-data-generator.
Beispiel:
{ "id": {{integer(0,1000)}}, "name": "{{uuid()}}", "isInStock": {{bool()}} }
- Laden Sie die Schemadatei in einen Cloud Storage-Bucket hoch.
- Das Ausgabeziel muss vor der Ausführung vorhanden sein. Das Ziel muss ein Pub/Sub-Thema, eine BigQuery-Tabelle oder ein Cloud Storage-Bucket sein, je nach Senkentyp.
- Wenn die Ausgabecodierung Avro oder Parquet ist, erstellen Sie eine Avro-Schemadatei und speichern diese an einem Cloud Storage-Speicherort.
Vorlagenparameter
Parameter | Beschreibung |
---|---|
schemaLocation |
Speicherort der Schemadatei. Beispiel: gs://mybucket/filename.json . |
qps |
Anzahl der pro Sekunde zu veröffentlichenden Nachrichten. Beispiel: 100 . |
sinkType |
(Optional) Ausgabesenkentyp. Folgende Werte sind möglich: PUBSUB , BIGQUERY und GCS . Der Standardwert ist PUBSUB. |
outputType |
(Optional) Ausgabe-Codierungstyp. Folgende Werte sind möglich: JSON , AVRO und PARQUET . Der Standardwert ist JSON. |
avroSchemaLocation |
(Optional) Speicherort der AVRO-Schemadatei. Erforderlich, wenn outputType AVRO oder PARQUET ist. Beispiel: gs://mybucket/filename.avsc |
topic |
(Optional) Name des Pub/Sub-Themas, in dem die Pipeline Daten veröffentlichen soll. Erforderlich, wenn sinkType Pub/Sub ist. Beispiel: . |
outputTableSpec |
(Optional) Name der BigQuery-Ausgabetabelle. Obligatorisch, wenn sinkType BigQuery ist. Beispiel:
|
writeDisposition |
(Optional) BigQuery-Schreibanordnung. Mögliche Werte sind WRITE_APPEND , WRITE_EMPTY und WRITE_TRUNCATE . Der Standardwert ist WRITE_APPEND. |
outputDeadletterTable |
(Optional) Name der BigQuery-Ausgabetabelle zum Speichern fehlerhafter Datensätze. Wenn nicht angegeben, erstellt die Pipeline während der Ausführung eine Tabelle mit dem Namen {output_table_name}_error_records. Beispiel: . |
outputDirectory |
(Optional) Pfad des Cloud Storage-Ausgabespeicherorts. Erforderlich, wenn sinkType Cloud Storage ist. Beispiel: gs://mybucket/pathprefix/ |
outputFilenamePrefix |
(Optional) Das Dateinamenpräfix der in Cloud Storage geschriebenen Ausgabedateien. Der Standardwert ist output-. |
windowDuration |
(Optional) Fensterintervall, in dem die Ausgabe nach Cloud Storage geschrieben wird. Der Standardwert ist 1m (für 1 Minute). |
numShards |
(Optional) Maximale Anzahl an Ausgabe-Shards. Erforderlich, wenn sinkType Cloud Storage ist; es muss mindestens 1 festgelegt werden. |
messagesLimit |
(Optional) Maximale Anzahl an Ausgabenachrichten. Der Standardwert ist 0, d. h., die Anzahl ist unbegrenzt. |
autoscalingAlgorithm |
(Optional) Algorithmus für das Autoscaling der Worker. Mögliche Werte sind THROUGHPUT_BASED , um das Autoscaling zu aktivieren, oder NONE , um es zu deaktivieren. |
maxNumWorkers |
(Optional) Maximale Anzahl an Worker-Maschinen. Beispiel: 10 . |
Vorlage für Streaming Data Generator ausführen
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Streaming Data Generator template aus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \ --parameters \ schemaLocation=SCHEMA_LOCATION,\ qps=QPS,\ topic=PUBSUB_TOPIC
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenREGION_NAME
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
SCHEMA_LOCATION
: der Pfad zur Schemadatei in Cloud Storage, z. B.gs://mybucket/filename.json
QPS
: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollenPUBSUB_TOPIC
: das Pub/Sub-Ausgabethema Beispiel:projects/my-project-ID/topics/my-topic-ID
.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "schemaLocation": "SCHEMA_LOCATION", "qps": "QPS", "topic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator", } }
Dabei gilt:
PROJECT_ID
: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenLOCATION
: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/- Den Versionsnamen wie
2021-09-20-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
SCHEMA_LOCATION
: der Pfad zur Schemadatei in Cloud Storage, z. B.gs://mybucket/filename.json
QPS
: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollenPUBSUB_TOPIC
: das Pub/Sub-Ausgabethema Beispiel:projects/my-project-ID/topics/my-topic-ID
.