Vorlage „Streaming Data Generator für Pub/Sub, BigQuery und Cloud Storage“

Die Vorlage für Streaming Data Generator wird verwendet, um entweder eine unbegrenzte oder eine feste Anzahl synthetischer Datensätze oder Nachrichten anhand eines vom Nutzer bereitgestellten Schemas mit der angegebenen Rate zu generieren. Zu den kompatiblen Zielen zählen Pub/Sub-Themen, BigQuery-Tabellen und Cloud Storage-Buckets.

Im Folgenden sind einige mögliche Anwendungsfälle aufgeführt:

  • Simulieren der Veröffentlichung von Echtzeitereignissen in großem Umfang in einem Pub/Sub-Thema, um die Anzahl und Größe der Nutzer zu ermitteln und zu bestimmen, die zur Verarbeitung veröffentlichter Ereignisse erforderlich sind.
  • Generieren synthetischer Daten in einer BigQuery-Tabelle oder in einem Cloud Storage-Bucket, um Leistungs-Benchmarks zu bewerten oder um diese als Proof of Concept zu nutzen.

Unterstützte Senken und Codierungsformate

In der folgenden Tabelle wird gezeigt, welche Senken und Codierungsformate von dieser Vorlage unterstützt werden:
JSON Avro Parquet
Pub/Sub Ja Ja Nein
BigQuery Ja Nein Nein
Cloud Storage Ja Ja Ja

Pipelineanforderungen

  • Das Worker-Dienstkonto benötigt die Rolle "Dataflow-Worker" (roles/dataflow.worker). Weitere Informationen finden Sie unter Einführung in IAM.
  • Erstellen Sie eine Schemadatei, die eine JSON-Vorlage für die generierten Daten enthält. Diese Vorlage verwendet die JSON Data Generator-Bibliothek, sodass Sie für jedes Feld im Schema verschiedene Faker-Funktionen bereitstellen können. Weitere Informationen finden Sie in der Dokumentation zu json-data-generator.

    Beispiel:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Laden Sie die Schemadatei in einen Cloud Storage-Bucket hoch.
  • Das Ausgabeziel muss vor der Ausführung vorhanden sein. Das Ziel muss ein Pub/Sub-Thema, eine BigQuery-Tabelle oder ein Cloud Storage-Bucket sein, je nach Senkentyp.
  • Wenn die Ausgabecodierung Avro oder Parquet ist, erstellen Sie eine Avro-Schemadatei und speichern diese an einem Cloud Storage-Speicherort.
  • Weisen Sie dem Worker-Dienstkonto je nach gewünschtem Ziel eine zusätzliche IAM-Rolle zu.
    Ziel Darüber hinaus erforderliche IAM-Rolle Auf welche Ressource anwenden
    Pub/Sub Pub/Sub-Publisher (roles/pubsub.publisher)
    (Weitere Informationen finden Sie unter Pub/Sub-Zugriffssteuerung mit IAM)
    Pub/Sub-Thema
    BigQuery BigQuery Dateneditor (roles/bigquery.dataEditor)
    (Weitere Informationen finden Sie unter BigQuery-Zugriffssteuerung mit IAM)
    BigQuery-Dataset
    Cloud Storage Cloud Storage-Objektadministrator (roles/storage.objectAdmin)
    (Weitere Informationen finden Sie unter Cloud Storage-Zugriffssteuerung mit IAM)
    Cloud Storage-Bucket

Vorlagenparameter

Parameter Beschreibung
schemaLocation Speicherort der Schemadatei. Beispiel: gs://mybucket/filename.json.
qps Anzahl der pro Sekunde zu veröffentlichenden Nachrichten. Beispiel: 100.
sinkType (Optional) Ausgabesenkentyp. Folgende Werte sind möglich: PUBSUB, BIGQUERY und GCS. Der Standardwert ist PUBSUB.
outputType (Optional) Ausgabe-Codierungstyp. Folgende Werte sind möglich: JSON, AVRO und PARQUET. Der Standardwert ist JSON.
avroSchemaLocation (Optional) Speicherort der AVRO-Schemadatei. Erforderlich, wenn outputType AVRO oder PARQUET ist. Beispiel: gs://mybucket/filename.avsc
topic (Optional) Name des Pub/Sub-Themas, in dem die Pipeline Daten veröffentlichen soll. Erforderlich, wenn sinkType Pub/Sub ist. Beispiel: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Optional) Name der BigQuery-Ausgabetabelle. Obligatorisch, wenn sinkType BigQuery ist. Beispiel: my-project-ID:my_dataset_name.my-table-name
writeDisposition (Optional) BigQuery-Schreibanordnung. Mögliche Werte sind WRITE_APPEND, WRITE_EMPTY und WRITE_TRUNCATE. Der Standardwert ist WRITE_APPEND.
outputDeadletterTable (Optional) Name der BigQuery-Ausgabetabelle zum Speichern fehlerhafter Datensätze. Wenn nicht angegeben, erstellt die Pipeline während der Ausführung eine Tabelle mit dem Namen {output_table_name}_error_records. Beispiel: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Optional) Pfad des Cloud Storage-Ausgabespeicherorts. Erforderlich, wenn sinkType Cloud Storage ist. Beispiel: gs://mybucket/pathprefix/
outputFilenamePrefix (Optional) Das Dateinamenpräfix der in Cloud Storage geschriebenen Ausgabedateien. Der Standardwert ist output-.
windowDuration (Optional) Fensterintervall, in dem die Ausgabe nach Cloud Storage geschrieben wird. Der Standardwert ist 1m (für 1 Minute).
numShards (Optional) Maximale Anzahl an Ausgabe-Shards. Erforderlich, wenn sinkType Cloud Storage ist; es muss mindestens 1 festgelegt werden.
messagesLimit (Optional) Maximale Anzahl an Ausgabenachrichten. Der Standardwert ist 0, d. h., die Anzahl ist unbegrenzt.
autoscalingAlgorithm (Optional) Algorithmus für das Autoscaling der Worker. Mögliche Werte sind THROUGHPUT_BASED, um das Autoscaling zu aktivieren, oder NONE, um es zu deaktivieren.
maxNumWorkers (Optional) Maximale Anzahl an Worker-Maschinen. Beispiel: 10.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Streaming Data Generator templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-id/topics/my-topic-id.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-id/topics/my-topic-id.

Nächste Schritte