Von Google bereitgestellte Hilfsvorlagen

Google bietet eine Reihe von Open-Source-Vorlagen für Cloud Dataflow. Allgemeine Informationen zu Vorlagen finden Sie unter Dataflow-Vorlagen. Eine Liste aller von Google bereitgestellten Vorlagen finden Sie unter Erste Schritte mit von Google bereitgestellten Vorlagen.

In dieser Anleitung werden Hilfsvorlagen dokumentiert.

Dateiformatkonvertierung (Avro, Parquet, CSV)

Die Vorlage zur Dateiformatkonvertierung ist eine Batchpipeline die im Cloud-Speicher gespeicherte Dateien von einem unterstützten Format in ein anderes konvertiert.

Die folgenden Formatkonvertierungen werden unterstützt:

  • CSV zu Avro
  • CSV zu Parquet
  • Avro zu Parquet
  • Parquet zu Avro

Voraussetzungen für diese Pipeline:

  • Der Cloud Storage-Ausgabe-Bucket muss vorhanden sein, bevor Sie die Pipeline ausführen.

Vorlagenparameter

Parameter Beschreibung
inputFileFormat Das Eingabedateiformat. Dies muss einer der folgenden Werte sein: [csv, avro, parquet].
outputFileFormat Das Ausgabedateiformat. Dies muss einer der folgenden Werte sein: [avro, parquet].
inputFileSpec Das Cloud Storage-Pfadmuster für Eingabedateien. Beispiel: gs://bucket-name/path/*.csv
outputBucket Der Cloud Storage-Ordner zum Schreiben von Ausgabedateien. Dieser Pfad muss mit einem Schrägstrich enden. Beispiel: gs://bucket-name/output/
schema Der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://bucket-name/schema/my-schema.avsc)
containsHeaders (Optional) Die CSV-Eingabedateien enthalten eine Kopfzeile (true/false). Der Standardwert ist false. Nur erforderlich, wenn CSV-Dateien gelesen werden.
csvFormat (Optional) Die CSV-Formatspezifikation zum Parsen von Einträgen. Der Standardwert ist Default. Weitere Informationen finden Sie unter Apache Commons CSV-Format.
delimiter (Optional) Das Feldtrennzeichen, das von den eingegebenen CSV-Dateien verwendet wird.
outputFilePrefix (Optional) Das Präfix der Ausgabedatei. Der Standardwert ist output.
numShards [Optional] Die Anzahl der Shards der Ausgabedatei.

Vorlage für die Dateiformatkonvertierung ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Convert file formats template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_FORMAT: Das Dateiformat der Eingabedatei muss eines von [csv, avro, parquet] sein
  • OUTPUT_FORMAT: Das Dateiformat der Ausgabedateien muss eines von [avro, parquet] sein
  • INPUT_FILES: Das Pfadmuster für Eingabedateien
  • OUTPUT_FOLDER: Ihr Cloud Storage-Ordner für Ausgabedateien
  • SCHEMA: Der Pfad zur Avro-Schemadatei

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • INPUT_FORMAT: Das Dateiformat der Eingabedatei muss eines von [csv, avro, parquet] sein
  • OUTPUT_FORMAT: Das Dateiformat der Ausgabedateien muss eines von [avro, parquet] sein
  • INPUT_FILES: Das Pfadmuster für Eingabedateien
  • OUTPUT_FOLDER: Ihr Cloud Storage-Ordner für Ausgabedateien
  • SCHEMA: Der Pfad zur Avro-Schemadatei

Bulk-Komprimierung von Cloud Storage-Dateien

Die Vorlage "Bulk-Komprimierung von Cloud Storage-Dateien" ist eine Batchpipeline, die Dateien in Cloud Storage an einem festgelegten Speicherort komprimiert. Diese Vorlage kann nützlich sein, wenn Sie große Dateistapel im Rahmen eines periodischen Archivierungsvorgangs komprimieren müssen. Die folgenden Komprimierungsmodi werden unterstützt: BZIP2, DEFLATE, GZIP. Dateien, die an den Zielort ausgegeben werden, folgen dem Namensschema des ursprünglichen Dateinamens, an den die Erweiterung des Komprimierungsmodus angehängt wird. Mögliche Erweiterungen sind: .bzip2, .deflate, .gz.

Alle Fehler, die während des Komprimierungsvorgangs auftreten, werden in der Fehlerdatei im CSV-Format (Dateiname, Fehlermeldung) ausgegeben. Die Fehlerdatei wird auch dann erstellt, wenn während der Ausführung der Pipeline keine Fehler auftreten. Sie enthält dann jedoch keine Fehlerdatensätze.

Voraussetzungen für diese Pipeline:

  • Die Komprimierung muss in einem der folgenden Formate erfolgen: BZIP2, DEFLATE oder GZIP.
  • Das Ausgabeverzeichnis muss vorhanden sein, damit Sie die Pipeline verwenden können.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Das Muster der Eingabedatei, aus der gelesen werden soll. z. B. gs://bucket-name/uncompressed/*.txt.
outputDirectory Der Ausgabeort, in den geschrieben werden soll, z. B. gs://bucket-name/compressed/.
outputFailureFile Die Ausgabedatei des Fehlerlogs für Schreibfehler, die während der Komprimierung auftreten. Beispiel: gs://bucket-name/compressed/failed.csv Wenn keine Fehler auftreten, wird die Datei zwar erstellt, bleibt aber leer. Der Dateiinhalt liegt im CSV-Format (Dateiname, Fehler) vor und besteht aus einer Zeile pro Datei, bei der die Komprimierung fehlschlägt.
compression Der Komprimierungsalgorithmus, der zur Komprimierung der übereinstimmenden Dateien verwendet wird. Es muss sich um einen dieser Algorithmen handeln: BZIP2, DEFLATE oder GZIP.

Vorlage "Bulk-Komprimierung von Cloud Storage-Dateien" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Compress Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • COMPRESSION: der ausgewählte Komprimierungsalgorithmus

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • COMPRESSION: der ausgewählte Komprimierungsalgorithmus

Bulk-Dekomprimierung von Cloud Storage-Dateien

Die Vorlage "Bulk-Dekomprimierung von Cloud Storage-Dateien" ist eine Batchpipeline, die Dateien in Cloud Storage an einem festgelegten Speicherort dekomprimiert. Diese Funktionalität ist nützlich, wenn Sie komprimierte Daten verwenden möchten, um die Kosten der Netzwerkbandbreiten während einer Migration zu minimieren, aber nach der Migration die analytische Verarbeitungsgeschwindigkeit wieder maximieren möchten, um mit unkomprimierten Daten zu arbeiten. Die Pipeline verarbeitet automatisch mehrere Komprimierungsmodi während einer einzelnen Ausführung und bestimmt den zu verwendenden Dekomprimierungsmodus anhand der Dateierweiterungen .bzip2, .deflate, .gz und .zip.

Voraussetzungen für diese Pipeline:

  • Die zu dekomprimierenden Dateien müssen in einem der folgenden Formate vorliegen: Bzip2, Deflate, Gzip, Zip.
  • Das Ausgabeverzeichnis muss vorhanden sein, damit Sie die Pipeline verwenden können.

Vorlagenparameter

Parameter Beschreibung
inputFilePattern Das Muster der Eingabedatei, aus der gelesen werden soll. z. B. gs://bucket-name/compressed/*.gz.
outputDirectory Der Ausgabeort, in den geschrieben werden soll, z. B. gs://bucket-name/decompressed.
outputFailureFile Die Ausgabedatei des Fehlerlogs für Schreibfehler, die während der Komprimierung auftreten. Beispiel: gs://bucket-name/decompressed/failed.csv Wenn keine Fehler auftreten, wird die Datei zwar erstellt, bleibt aber leer. Der Dateiinhalt liegt im CSV-Format (Dateiname, Fehler) vor und besteht aus einer Zeile pro Datei, bei der die Dekomprimierung fehlschlägt.

Vorlage "Bulk-Dekomprimierung von Cloud Storage-Dateien" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Decompress Files on Cloud Storage template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • OUTPUT_FAILURE_FILE_PATH: der Pfad Ihrer Datei mit Fehlerinformationen

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • OUTPUT_FAILURE_FILE_PATH: der Pfad Ihrer Datei mit Fehlerinformationen

Bulk-Löschen in Datastore [verworfen]

Diese Vorlage wurde verworfen und wird in Q1 2022 entfernt. Migrieren Sie zur Vorlage Bulk-Löschen in Firestore.

Die Vorlage "Bulk-Löschen in Datastore" ist eine Pipeline, die Entitäten aus Datastore mit einer gegebenen GQL-Abfrage einliest und anschließend alle übereinstimmenden Entitäten im ausgewählten Zielprojekt löscht. Die Pipeline kann optional die JSON-codierten Datastore-Entitäten an Ihre JavaScript-UDF übergeben, mit der Sie Entitäten durch Zurückgeben von Nullwerten herausfiltern können.

Voraussetzungen für diese Pipeline:

  • Datastore muss vor dem Ausführen der Vorlage im Projekt eingerichtet werden.
  • Wenn das Dataflow-Worker-Dienstkonto aus separaten Datastore-Instanzen gelesen und gelöscht wird, muss es berechtigt sein, aus einer Instanz zu lesen und aus der anderen Instanz zu löschen.

Vorlagenparameter

Parameter Beschreibung
datastoreReadGqlQuery GQL-Abfrage, die festlegt, welche Entitäten gelöscht werden sollen. Die Verwendung einer ausschließlich schlüsselbasierten Abfrage kann die Leistung verbessern. Beispiel: "SELECT __key__ FROM MyKind".
datastoreReadProjectId Projekt-ID der Datastore-Instanz, aus der (mithilfe Ihrer GQL-Abfrage) Entitäten gelesen werden sollen, die für den Abgleich verwendet werden.
datastoreDeleteProjectId Projekt-ID der Datastore-Instanz, aus der übereinstimmende Entitäten gelöscht werden sollen. Dies kann mit datastoreReadProjectId identisch sein, wenn Sie innerhalb derselben Datastore-Instanz lesen und löschen möchten.
datastoreReadNamespace (Optional) Namespace der angeforderten Entitäten. Als "" für den Standard-Namespace festlegen.
datastoreHintNumWorkers (Optional) Hinweis für die erwartete Anzahl von Workern im Schritt zur Drosselung der Erhöhung in Datastore. Standardwert ist 500.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele. Wenn diese Funktion für eine bestimmte Datastore-Entität den Wert "nicht definiert" oder "Null" zurückgibt, wird diese Entität nicht gelöscht.

Vorlage "Bulk-Löschen in Datastore" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Delete Entities in Datastore template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GQL_QUERY: die Abfrage, mit der Sie Entitäten zum Löschen auswählen
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: die Projekt-ID Ihrer Datastore-Instanz; in diesem Beispiel werden Lese- und Löschvorgänge in derselben Datastore-Instanz ausgeführt

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GQL_QUERY: die Abfrage, mit der Sie Entitäten zum Löschen auswählen
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: die Projekt-ID Ihrer Datastore-Instanz; in diesem Beispiel werden Lese- und Löschvorgänge in derselben Datastore-Instanz ausgeführt

Bulk-Löschen in Firestore

Die Vorlage "Bulk-Löschen in Firestore" ist eine Pipeline, die Entitäten aus Firestore mit einer gegebenen GQL-Abfrage einliest und anschließend alle übereinstimmenden Entitäten im ausgewählten Zielprojekt löscht. Die Pipeline kann optional die JSON-codierten Firestore-Entitäten an Ihre JavaScript-UDF übergeben, mit der Sie Entitäten durch Zurückgeben von Nullwerten herausfiltern können.

Voraussetzungen für diese Pipeline:

  • Firestore muss vor dem Ausführen der Vorlage im Projekt eingerichtet werden.
  • Wenn das Dataflow-Worker-Dienstkonto aus separaten Firestore-Instanzen gelesen und gelöscht wird, muss es berechtigt sein, aus einer Instanz zu lesen und aus der anderen Instanz zu löschen.

Vorlagenparameter

Parameter Beschreibung
firestoreReadGqlQuery GQL-Abfrage, die festlegt, welche Entitäten gelöscht werden sollen. Die Verwendung einer ausschließlich schlüsselbasierten Abfrage kann die Leistung verbessern. Beispiel: "SELECT __key__ FROM MyKind".
firestoreReadProjectId Projekt-ID der Firestore-Instanz, aus der Entitäten gelesen werden sollen (mithilfe Ihrer GQL-Abfrage), die für den Abgleich verwendet werden.
firestoreDeleteProjectId Projekt-ID der Firestore-Instanz, aus der übereinstimmende Entitäten gelöscht werden sollen. Dies kann mit firestoreReadProjectId identisch sein, wenn Sie innerhalb derselben Firestore-Instanz lesen und löschen möchten.
firestoreReadNamespace (Optional) Namespace der angeforderten Entitäten. Als "" für den Standard-Namespace festlegen.
firestoreHintNumWorkers (Optional) Hinweis für die erwartete Anzahl von Workern im Schritt zur Drosselung der Erhöhung in Firestore. Standardwert ist 500.
javascriptTextTransformGcsPath (Optional) Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) Der Name der benutzerdefinierten JavaScript-Funktion, die Sie verwenden möchten. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele. Wenn diese Funktion für eine bestimmte Firestore-Entität den Wert "nicht definiert" oder "Null" zurückgibt, wird diese Entität nicht gelöscht.

Vorlage "Bulk-Löschen in Firestore" ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bulk Delete Entities in Firestore template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="GQL_QUERY",\
firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\
firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID

Dabei gilt:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GQL_QUERY: die Abfrage, mit der Sie Entitäten zum Löschen auswählen
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: die Projekt-ID Ihrer Firestore-Instanz. In diesem Beispiel werden Lese- und Löschvorgänge in derselben Firestore-Instanz ausgeführt.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "GQL_QUERY",
       "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID",
       "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • GQL_QUERY: die Abfrage, mit der Sie Entitäten zum Löschen auswählen
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: die Projekt-ID Ihrer Firestore-Instanz. In diesem Beispiel werden Lese- und Löschvorgänge in derselben Firestore-Instanz ausgeführt.

Streaming Data Generator für Pub/Sub/BigQuery/Cloud Storage

Die Vorlage für Streaming Data Generator wird verwendet, um entweder eine unbegrenzte oder eine feste Anzahl synthetischer Datensätze oder Nachrichten anhand eines vom Nutzer bereitgestellten Schemas mit der angegebenen Rate zu generieren. Zu den kompatiblen Zielen zählen Pub/Sub-Themen, BigQuery-Tabellen und Cloud Storage-Buckets.

Im Folgenden sind einige mögliche Anwendungsfälle aufgeführt:

  • Simulieren der Veröffentlichung von Echtzeitereignissen in großem Umfang in einem Pub/Sub-Thema, um die Anzahl und Größe der Nutzer zu ermitteln und zu bestimmen, die zur Verarbeitung veröffentlichter Ereignisse erforderlich sind.
  • Generieren synthetischer Daten in einer BigQuery-Tabelle oder in einem Cloud Storage-Bucket, um Leistungs-Benchmarks zu bewerten oder um diese als Proof of Concept zu nutzen.

Unterstützte Senken und Codierungsformate

In der folgenden Tabelle wird gezeigt, welche Senken und Codierungsformate von dieser Vorlage unterstützt werden:
JSON Avro Parquet
Pub/Sub Ja Ja Nein
BigQuery Ja Nein Nein
Cloud Storage Ja Ja Ja

Voraussetzungen für diese Pipeline:

  • Erstellen Sie eine Schemadatei, die eine JSON-Vorlage für die generierten Daten enthält. Diese Vorlage verwendet die JSON Data Generator-Bibliothek, sodass Sie für jedes Feld im Schema verschiedene Faker-Funktionen bereitstellen können. Weitere Informationen finden Sie in der Dokumentation zu json-data-generator.

    Beispiel:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Laden Sie die Schemadatei in einen Cloud Storage-Bucket hoch.
  • Das Ausgabeziel muss vor der Ausführung vorhanden sein. Das Ziel muss ein Pub/Sub-Thema, eine BigQuery-Tabelle oder ein Cloud Storage-Bucket sein, je nach Senkentyp.
  • Wenn die Ausgabecodierung Avro oder Parquet ist, erstellen Sie eine Avro-Schemadatei und speichern diese an einem Cloud Storage-Speicherort.

Vorlagenparameter

Parameter Beschreibung
schemaLocation Speicherort der Schemadatei. Beispiel: gs://mybucket/filename.json.
qps Anzahl der pro Sekunde zu veröffentlichenden Nachrichten. Beispiel: 100.
sinkType (Optional) Ausgabesenkentyp. Folgende Werte sind möglich: PUBSUB, BIGQUERY und GCS. Der Standardwert ist PUBSUB.
outputType (Optional) Ausgabe-Codierungstyp. Folgende Werte sind möglich: JSON, AVRO und PARQUET. Der Standardwert ist JSON.
avroSchemaLocation (Optional) Speicherort der AVRO-Schemadatei. Erforderlich, wenn outputType AVRO oder PARQUET ist. Beispiel: gs://mybucket/filename.avsc
topic (Optional) Name des Pub/Sub-Themas, in dem die Pipeline Daten veröffentlichen soll. Erforderlich, wenn sinkType Pub/Sub ist. Beispiel: projects/my-project-ID/topics/my-topic-ID.
outputTableSpec (Optional) Name der BigQuery-Ausgabetabelle. Obligatorisch, wenn sinkType BigQuery ist. Beispiel: my-project-ID:my_dataset_name.my-table-name
writeDisposition (Optional) BigQuery-Schreibanordnung. Mögliche Werte sind WRITE_APPEND, WRITE_EMPTY und WRITE_TRUNCATE. Der Standardwert ist WRITE_APPEND.
outputDeadletterTable (Optional) Name der BigQuery-Ausgabetabelle zum Speichern fehlerhafter Datensätze. Wenn nicht angegeben, erstellt die Pipeline während der Ausführung eine Tabelle mit dem Namen {output_table_name}_error_records. Beispiel: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Optional) Pfad des Cloud Storage-Ausgabespeicherorts. Erforderlich, wenn sinkType Cloud Storage ist. Beispiel: gs://mybucket/pathprefix/
outputFilenamePrefix (Optional) Das Dateinamenpräfix der in Cloud Storage geschriebenen Ausgabedateien. Der Standardwert ist output-.
windowDuration (Optional) Fensterintervall, in dem die Ausgabe nach Cloud Storage geschrieben wird. Der Standardwert ist 1m (für 1 Minute).
numShards (Optional) Maximale Anzahl an Ausgabe-Shards. Erforderlich, wenn sinkType Cloud Storage ist; es muss mindestens 1 festgelegt werden.
messagesLimit (Optional) Maximale Anzahl an Ausgabenachrichten. Der Standardwert ist 0, d. h., die Anzahl ist unbegrenzt.
autoscalingAlgorithm (Optional) Algorithmus für das Autoscaling der Worker. Mögliche Werte sind THROUGHPUT_BASED, um das Autoscaling zu aktivieren, oder NONE, um es zu deaktivieren.
maxNumWorkers (Optional) Maximale Anzahl an Worker-Maschinen. Beispiel: 10.

Vorlage für Streaming Data Generator ausführen

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Der regionale Standardendpunkt ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Streaming Data Generator template aus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-ID/topics/my-topic-ID.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Dabei gilt:

  • PROJECT_ID: die Cloud-Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • LOCATION: der regionale Endpunkt, an dem Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates/latest/
    • Den Versionsnamen wie 2021-09-20-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-ID/topics/my-topic-ID.