Vorlage „Streaming Data Generator für Pub/Sub, BigQuery und Cloud Storage“

Die Vorlage für Streaming Data Generator wird verwendet, um entweder eine unbegrenzte oder eine feste Anzahl synthetischer Datensätze oder Nachrichten anhand eines vom Nutzer bereitgestellten Schemas mit der angegebenen Rate zu generieren. Zu den kompatiblen Zielen zählen Pub/Sub-Themen, BigQuery-Tabellen und Cloud Storage-Buckets.

Im Folgenden sind einige mögliche Anwendungsfälle aufgeführt:

  • Simulieren der Veröffentlichung von Echtzeitereignissen in großem Umfang in einem Pub/Sub-Thema, um die Anzahl und Größe der Nutzer zu ermitteln und zu bestimmen, die zur Verarbeitung veröffentlichter Ereignisse erforderlich sind.
  • Generieren synthetischer Daten in einer BigQuery-Tabelle oder in einem Cloud Storage-Bucket, um Leistungs-Benchmarks zu bewerten oder um diese als Proof of Concept zu nutzen.

Unterstützte Senken und Codierungsformate

In der folgenden Tabelle wird gezeigt, welche Senken und Codierungsformate von dieser Vorlage unterstützt werden:
JSON Avro Parquet
Pub/Sub Ja Ja Nein
BigQuery Ja Nein Nein
Cloud Storage Ja Ja Ja

Pipelineanforderungen

  • Das Worker-Dienstkonto benötigt die Rolle "Dataflow-Worker" (roles/dataflow.worker). Weitere Informationen finden Sie unter Einführung in IAM.
  • Erstellen Sie eine Schemadatei mit einer JSON-Vorlage für die generierten Daten. Bei dieser Vorlage wird die JSON Data Generator-Bibliothek verwendet. Sie können also für jedes Feld im Schema verschiedene Faker-Funktionen angeben. Weitere Informationen finden Sie in der Dokumentation zu json-data-generator.

    Beispiel:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Laden Sie die Schemadatei in einen Cloud Storage-Bucket hoch.
  • Das Ausgabeziel muss vor der Ausführung vorhanden sein. Das Ziel muss ein Pub/Sub-Thema, eine BigQuery-Tabelle oder ein Cloud Storage-Bucket sein, je nach Senkentyp.
  • Wenn die Ausgabecodierung Avro oder Parquet ist, erstellen Sie eine Avro-Schemadatei und speichern diese an einem Cloud Storage-Speicherort.
  • Weisen Sie dem Arbeiterdienstkonto je nach gewünschtem Ziel eine zusätzliche IAM-Rolle zu.
    Ziel Zusätzlich erforderliche IAM-Rolle Auf welche Ressource anwenden
    Pub/Sub Pub/Sub-Publisher (roles/pubsub.publisher)
    (weitere Informationen finden Sie unter Pub/Sub-Zugriffssteuerung mit IAM)
    Pub/Sub-Thema
    BigQuery BigQuery-Dateneditor (roles/bigquery.dataEditor)
    (weitere Informationen finden Sie unter BigQuery-Zugriffssteuerung mit IAM)
    BigQuery-Dataset
    Cloud Storage Cloud Storage-Objekt-Administrator (roles/storage.objectAdmin)
    (weitere Informationen finden Sie unter Zugriffssteuerung für Cloud Storage mit IAM)
    Cloud Storage-Bucket

Vorlagenparameter

Parameter Beschreibung
schemaLocation Speicherort der Schemadatei. Beispiel: gs://mybucket/filename.json.
qps Anzahl der pro Sekunde zu veröffentlichenden Nachrichten. Beispiel: 100.
sinkType (Optional) Ausgabesenkentyp. Folgende Werte sind möglich: PUBSUB, BIGQUERY und GCS. Der Standardwert ist PUBSUB.
outputType (Optional) Ausgabe-Codierungstyp. Folgende Werte sind möglich: JSON, AVRO und PARQUET. Der Standardwert ist JSON.
avroSchemaLocation (Optional) Speicherort der AVRO-Schemadatei. Erforderlich, wenn outputType AVRO oder PARQUET ist. Beispiel: gs://mybucket/filename.avsc
topic (Optional) Name des Pub/Sub-Themas, in dem die Pipeline Daten veröffentlichen soll. Erforderlich, wenn sinkType Pub/Sub ist. Beispiel: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Optional) Name der BigQuery-Ausgabetabelle. Obligatorisch, wenn sinkType BigQuery ist. Beispiel: my-project-ID:my_dataset_name.my-table-name
writeDisposition (Optional) BigQuery-Schreibanordnung. Mögliche Werte sind WRITE_APPEND, WRITE_EMPTY und WRITE_TRUNCATE. Der Standardwert ist WRITE_APPEND.
outputDeadletterTable (Optional) Name der BigQuery-Ausgabetabelle zum Speichern fehlerhafter Datensätze. Wenn nicht angegeben, erstellt die Pipeline während der Ausführung eine Tabelle mit dem Namen {output_table_name}_error_records. Beispiel: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Optional) Pfad des Cloud Storage-Ausgabespeicherorts. Erforderlich, wenn sinkType Cloud Storage ist. Beispiel: gs://mybucket/pathprefix/
outputFilenamePrefix (Optional) Das Dateinamenpräfix der in Cloud Storage geschriebenen Ausgabedateien. Der Standardwert ist output-.
windowDuration (Optional) Fensterintervall, in dem die Ausgabe nach Cloud Storage geschrieben wird. Der Standardwert ist 1m (für 1 Minute).
numShards (Optional) Maximale Anzahl an Ausgabe-Shards. Erforderlich, wenn sinkType Cloud Storage ist; es muss mindestens 1 festgelegt werden.
messagesLimit (Optional) Maximale Anzahl an Ausgabenachrichten. Der Standardwert ist 0, d. h., die Anzahl ist unbegrenzt.
autoscalingAlgorithm (Optional) Algorithmus für das Autoscaling der Worker. Mögliche Werte sind THROUGHPUT_BASED, um das Autoscaling zu aktivieren, oder NONE, um es zu deaktivieren.
maxNumWorkers (Optional) Maximale Anzahl an Worker-Maschinen. Beispiel: 10.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Streaming Data Generator templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-id/topics/my-topic-id.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_LOCATION: der Pfad zur Schemadatei in Cloud Storage, z. B. gs://mybucket/filename.json
  • QPS: die Anzahl der Nachrichten, die pro Sekunde zu veröffentlicht werden sollen
  • PUBSUB_TOPIC: das Pub/Sub-Ausgabethema Beispiel: projects/my-project-id/topics/my-topic-id.
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToJdbc;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToKafka;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToSpanner;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
 * The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
 * specified rate to either Pub/Sub, BigQuery, GCS, JDBC, or Spanner. The messages are generated
 * according to a schema template which instructs the pipeline how to populate the messages with
 * fake data compliant to constraints.
 *
 * <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
 * Use a general rule of 2,500 QPS per core in the worker pool.
 *
 * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
 * for instructions on how to construct the schema file.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/streaming-data-generator/README_Streaming_Data_Generator.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Streaming_Data_Generator",
    category = TemplateCategory.UTILITIES,
    displayName = "Streaming Data Generator",
    description =
        "A pipeline to publish messages at specified QPS.This template can be used to benchmark"
            + " performance of streaming pipelines.",
    optionsClass = StreamingDataGeneratorOptions.class,
    flexContainerName = "streaming-data-generator",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/streaming-data-generator",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public class StreamingDataGenerator {

  /**
   * The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
   * the executor at the command-line.
   */
  public interface StreamingDataGeneratorOptions extends PipelineOptions {
    @TemplateParameter.Long(
        order = 1,
        description = "Required output rate",
        helpText = "Indicates rate of messages per second to be published to Pub/Sub")
    @Required
    Long getQps();

    void setQps(Long value);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("GAME_EVENT")},
        optional = true,
        description = "Schema template to generate fake data",
        helpText = "Pre-existing schema template to use. The value must be one of: [GAME_EVENT]")
    SchemaTemplate getSchemaTemplate();

    void setSchemaTemplate(SchemaTemplate value);

    @TemplateParameter.GcsReadFile(
        order = 3,
        optional = true,
        description = "Location of Schema file to generate fake data",
        helpText = "Cloud Storage path of schema location.",
        example = "gs://<bucket-name>/prefix")
    String getSchemaLocation();

    void setSchemaLocation(String value);

    @TemplateParameter.PubsubTopic(
        order = 4,
        optional = true,
        description = "Output Pub/Sub topic",
        helpText = "The name of the topic to which the pipeline should publish data.",
        example = "projects/<project-id>/topics/<topic-name>")
    String getTopic();

    void setTopic(String value);

    @TemplateParameter.Long(
        order = 5,
        optional = true,
        description = "Maximum number of output Messages",
        helpText =
            "Indicates maximum number of output messages to be generated. 0 means unlimited.")
    @Default.Long(0L)
    Long getMessagesLimit();

    void setMessagesLimit(Long value);

    @TemplateParameter.Enum(
        order = 6,
        enumOptions = {
          @TemplateEnumOption("AVRO"),
          @TemplateEnumOption("JSON"),
          @TemplateEnumOption("PARQUET")
        },
        optional = true,
        description = "Output Encoding Type",
        helpText = "The message Output type. Default is JSON.")
    @Default.Enum("JSON")
    OutputType getOutputType();

    void setOutputType(OutputType value);

    @TemplateParameter.GcsReadFile(
        order = 7,
        optional = true,
        parentName = "outputType",
        parentTriggerValues = {"AVRO", "PARQUET"},
        description = "Location of Avro Schema file",
        helpText =
            "Cloud Storage path of Avro schema location. Mandatory when output type is AVRO or"
                + " PARQUET.",
        example = "gs://your-bucket/your-path/schema.avsc")
    String getAvroSchemaLocation();

    void setAvroSchemaLocation(String value);

    @TemplateParameter.Enum(
        order = 8,
        enumOptions = {
          @TemplateEnumOption("BIGQUERY"),
          @TemplateEnumOption("GCS"),
          @TemplateEnumOption("PUBSUB"),
          @TemplateEnumOption("JDBC"),
          @TemplateEnumOption("SPANNER"),
          @TemplateEnumOption("KAFKA")
        },
        optional = true,
        description = "Output Sink Type",
        helpText = "The message Sink type. Default is PUBSUB")
    @Default.Enum("PUBSUB")
    SinkType getSinkType();

    void setSinkType(SinkType value);

    @TemplateParameter.BigQueryTable(
        order = 9,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "Output BigQuery table",
        helpText = "Output BigQuery table. Mandatory when sinkType is BIGQUERY",
        example = "<project>:<dataset>.<table_name>")
    String getOutputTableSpec();

    void setOutputTableSpec(String value);

    @TemplateParameter.Enum(
        order = 10,
        enumOptions = {
          @TemplateEnumOption("WRITE_APPEND"),
          @TemplateEnumOption("WRITE_EMPTY"),
          @TemplateEnumOption("WRITE_TRUNCATE")
        },
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "Write Disposition to use for BigQuery",
        helpText =
            "BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.")
    @Default.String("WRITE_APPEND")
    String getWriteDisposition();

    void setWriteDisposition(String writeDisposition);

    @TemplateParameter.BigQueryTable(
        order = 11,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "The dead-letter table name to output failed messages to BigQuery",
        helpText =
            "Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
                + " schema, malformed json) are written to this table. If it doesn't exist, it will"
                + " be created during pipeline execution.",
        example = "your-project-id:your-dataset.your-table-name")
    String getOutputDeadletterTable();

    void setOutputDeadletterTable(String outputDeadletterTable);

    @TemplateParameter.Duration(
        order = 12,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"GCS"},
        description = "Window duration",
        helpText =
            "The window duration/size in which data will be written to Cloud Storage. Allowed"
                + " formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh"
                + " (for hours, example: 2h).",
        example = "1m")
    @Default.String("1m")
    String getWindowDuration();

    void setWindowDuration(String windowDuration);

    @TemplateParameter.GcsWriteFolder(
        order = 13,
        optional = true,
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. Must end with a slash. DateTime"
                + " formatting is used to parse directory path for date & time formatters.",
        example = "gs://your-bucket/your-path/")
    String getOutputDirectory();

    void setOutputDirectory(String outputDirectory);

    @TemplateParameter.Text(
        order = 14,
        optional = true,
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output-")
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String outputFilenamePrefix);

    @TemplateParameter.Integer(
        order = 15,
        optional = true,
        description = "Maximum output shards",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of shards"
                + " means higher throughput for writing to Cloud Storage, but potentially higher"
                + " data aggregation cost across shards when processing output Cloud Storage files."
                + " Default value is decided by Dataflow.")
    @Default.Integer(0)
    Integer getNumShards();

    void setNumShards(Integer numShards);

    @TemplateParameter.Text(
        order = 16,
        optional = true,
        regexes = {"^.+$"},
        description = "JDBC driver class name.",
        helpText = "JDBC driver class name to use.",
        example = "com.mysql.jdbc.Driver")
    String getDriverClassName();

    void setDriverClassName(String driverClassName);

    @TemplateParameter.Text(
        order = 17,
        optional = true,
        regexes = {
          "(^jdbc:[a-zA-Z0-9/:@.?_+!*=&-;]+$)|(^([A-Za-z0-9+/]{4}){1,}([A-Za-z0-9+/]{0,3})={0,3})"
        },
        description = "JDBC connection URL string.",
        helpText = "Url connection string to connect to the JDBC source.",
        example = "jdbc:mysql://some-host:3306/sampledb")
    String getConnectionUrl();

    void setConnectionUrl(String connectionUrl);

    @TemplateParameter.Text(
        order = 18,
        optional = true,
        regexes = {"^.+$"},
        description = "JDBC connection username.",
        helpText = "User name to be used for the JDBC connection.")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 19,
        optional = true,
        description = "JDBC connection password.",
        helpText = "Password to be used for the JDBC connection.")
    String getPassword();

    void setPassword(String password);

    @TemplateParameter.Text(
        order = 20,
        optional = true,
        regexes = {"^[a-zA-Z0-9_;!*&=@#-:\\/]+$"},
        description = "JDBC connection property string.",
        helpText =
            "Properties string to use for the JDBC connection. Format of the string must be"
                + " [propertyName=property;]*.",
        example = "unicode=true;characterEncoding=UTF-8")
    String getConnectionProperties();

    void setConnectionProperties(String connectionProperties);

    @TemplateParameter.Text(
        order = 21,
        optional = true,
        regexes = {"^.+$"},
        description = "Statement which will be executed against the database.",
        helpText =
            "SQL statement which will be executed to write to the database. The statement must"
                + " specify the column names of the table in any order. Only the values of the"
                + " specified column names will be read from the json and added to the statement.",
        example = "INSERT INTO tableName (column1, column2) VALUES (?,?)")
    String getStatement();

    void setStatement(String statement);

    @TemplateParameter.ProjectId(
        order = 22,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "GCP Project Id of where the Spanner table lives.",
        helpText = "GCP Project Id of where the Spanner table lives.")
    String getProjectId();

    void setProjectId(String projectId);

    @TemplateParameter.Text(
        order = 23,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner instance name.",
        helpText = "Cloud Spanner instance name.")
    String getSpannerInstanceName();

    void setSpannerInstanceName(String spannerInstanceName);

    @TemplateParameter.Text(
        order = 24,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner database name.",
        helpText = "Cloud Spanner database name.")
    String getSpannerDatabaseName();

    void setSpannerDatabaseName(String spannerDBName);

    @TemplateParameter.Text(
        order = 25,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner table name.",
        helpText = "Cloud Spanner table name.")
    String getSpannerTableName();

    void setSpannerTableName(String spannerTableName);

    @TemplateParameter.Long(
        order = 26,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max mutatated cells per batch.",
        helpText =
            "Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000")
    Long getMaxNumMutations();

    void setMaxNumMutations(Long value);

    @TemplateParameter.Long(
        order = 27,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max rows per batch.",
        helpText =
            "Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 1000")
    Long getMaxNumRows();

    void setMaxNumRows(Long value);

    @TemplateParameter.Long(
        order = 28,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max batch size in bytes.",
        helpText =
            "Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB")
    Long getBatchSizeBytes();

    void setBatchSizeBytes(Long value);

    @TemplateParameter.Long(
        order = 29,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Commit deadline in seconds for write requests.",
        helpText = "Specifies the deadline in seconds for the Commit API call.")
    Long getCommitDeadlineSeconds();

    void setCommitDeadlineSeconds(Long value);

    @TemplateParameter.Text(
        order = 30,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"KAFKA"},
        regexes = {"[,:a-zA-Z0-9._-]+"},
        description = "Output Kafka Bootstrap Server",
        helpText = "Kafka Bootstrap Server ",
        example = "localhost:9092")
    String getBootstrapServer();

    void setBootstrapServer(String bootstrapServer);

    @TemplateParameter.Text(
        order = 31,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"KAFKA"},
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "Kafka topic to write to",
        helpText = "Kafka topic to write to.",
        example = "topic")
    String getKafkaTopic();

    void setKafkaTopic(String outputTopic);
  }

  /** Allowed list of existing schema templates. */
  public enum SchemaTemplate {
    GAME_EVENT(
        "{\n"
            + "  \"eventId\": \"{{uuid()}}\",\n"
            + "  \"eventTimestamp\": {{timestamp()}},\n"
            + "  \"ipv4\": \"{{ipv4()}}\",\n"
            + "  \"ipv6\": \"{{ipv6()}}\",\n"
            + "  \"country\": \"{{country()}}\",\n"
            + "  \"username\": \"{{username()}}\",\n"
            + "  \"quest\": \"{{random(\"A Break In the Ice\", \"Ghosts of Perdition\", \"Survive"
            + " the Low Road\")}}\",\n"
            + "  \"score\": {{integer(100, 10000)}},\n"
            + "  \"completed\": {{bool()}}\n"
            + "}"),
    LOG_ENTRY(
        "{\n"
            + "  \"logName\": \"{{alpha(10,20)}}\",\n"
            + "  \"resource\": {\n"
            + "    \"type\": \"{{alpha(5,10)}}\"\n"
            + "  },\n"
            + "  \"timestamp\": {{timestamp()}},\n"
            + "  \"receiveTimestamp\": {{timestamp()}},\n"
            + "  \"severity\": \"{{random(\"DEFAULT\", \"DEBUG\", \"INFO\", \"NOTICE\","
            + " \"WARNING\", \"ERROR\", \"CRITICAL\", \"ERROR\")}}\",\n"
            + "  \"insertId\": \"{{uuid()}}\",\n"
            + "  \"trace\": \"{{uuid()}}\",\n"
            + "  \"spanId\": \"{{uuid()}}\",\n"
            + "  \"jsonPayload\": {\n"
            + "    \"bytes_sent\": {{integer(1000,20000)}},\n"
            + "    \"connection\": {\n"
            + "      \"dest_ip\": \"{{ipv4()}}\",\n"
            + "      \"dest_port\": {{integer(0,65000)}},\n"
            + "      \"protocol\": {{integer(0,6)}},\n"
            + "      \"src_ip\": \"{{ipv4()}}\",\n"
            + "      \"src_port\": {{integer(0,65000)}}\n"
            + "    },\n"
            + "    \"dest_instance\": {\n"
            + "      \"project_id\": \"{{concat(\"PROJECT\", integer(0,3))}}\",\n"
            + "      \"region\": \"{{country()}}\",\n"
            + "      \"vm_name\": \"{{username()}}\",\n"
            + "      \"zone\": \"{{state()}}\"\n"
            + "    },\n"
            + "    \"end_time\": {{timestamp()}},\n"
            + "    \"packets_sent\": {{integer(100,400)}},\n"
            + "    \"reporter\": \"{{random(\"SRC\", \"DEST\")}}\",\n"
            + "    \"rtt_msec\": {{integer(0,20)}},\n"
            + "    \"start_time\": {{timestamp()}}\n"
            + "  }\n"
            + "}");

    private final String schema;

    SchemaTemplate(String schema) {
      this.schema = schema;
    }

    public String getSchema() {
      return schema;
    }
  }

  /** Allowed list of message encoding types. */
  public enum OutputType {
    JSON(".json"),
    AVRO(".avro"),
    PARQUET(".parquet");

    private final String fileExtension;

    /** Sets file extension associated with output type. */
    OutputType(String fileExtension) {
      this.fileExtension = fileExtension;
    }

    /** Returns file extension associated with output type. */
    public String getFileExtension() {
      return fileExtension;
    }
  }

  /** Allowed list of sink types. */
  public enum SinkType {
    PUBSUB,
    BIGQUERY,
    GCS,
    JDBC,
    SPANNER,
    KAFKA
  }

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
   * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args command-line args passed by the executor.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    StreamingDataGeneratorOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingDataGeneratorOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options the execution options.
   * @return the pipeline result.
   */
  public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {
    checkNotNull(options, "options argument to run method cannot be null.");
    MetadataValidator.validate(options);

    // FileSystems does not set the default configuration in workers till Pipeline.run
    // Explicitly registering standard file systems.
    FileSystems.setDefaultPipelineOptions(options);
    String schema = getSchema(options.getSchemaTemplate(), options.getSchemaLocation());

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Trigger at the supplied QPS
     *  2) Generate messages containing fake data
     *  3) Write messages to appropriate Sink
     */
    PCollection<byte[]> generatedMessages =
        pipeline
            .apply("Trigger", createTrigger(options))
            .apply("Generate Fake Messages", ParDo.of(new MessageGeneratorFn(schema)));

    if (options.getSinkType().equals(SinkType.GCS)) {
      generatedMessages =
          generatedMessages.apply(
              options.getWindowDuration() + " Window",
              Window.into(
                  FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
    }

    generatedMessages.apply(
        "Write To " + options.getSinkType().name(), createSink(options, schema));

    return pipeline.run();
  }

  /**
   * Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
   *
   * @param options the pipeline options.
   */
  private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {
    checkNotNull(options, "options argument to createTrigger method cannot be null.");
    GenerateSequence generateSequence =
        GenerateSequence.from(0L)
            .withRate(options.getQps(), /* periodLength= */ Duration.standardSeconds(1L));

    return options.getMessagesLimit() > 0
        ? generateSequence.to(options.getMessagesLimit())
        : generateSequence;
  }

  /**
   * The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
   *
   * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
   * for instructions on how to construct the schema file.
   */
  @VisibleForTesting
  static class MessageGeneratorFn extends DoFn<Long, byte[]> {

    // Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
    private transient JsonDataGenerator dataGenerator;
    private final String schema;

    MessageGeneratorFn(String schema) {
      this.schema = schema;
    }

    @Setup
    public void setup() {
      dataGenerator = new JsonDataGeneratorImpl();
    }

    @ProcessElement
    public void processElement(
        @Element Long element,
        @Timestamp Instant timestamp,
        OutputReceiver<byte[]> receiver,
        ProcessContext context)
        throws IOException, JsonDataGeneratorException {

      byte[] payload;

      // Generate the fake JSON according to the schema.
      try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
        dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
        payload = byteArrayOutputStream.toByteArray();
      }

      receiver.output(payload);
    }
  }

  /**
   * Creates appropriate sink based on sinkType pipeline option.
   *
   * @param options the pipeline options.
   */
  @VisibleForTesting
  static PTransform<PCollection<byte[]>, PDone> createSink(
      @Nonnull StreamingDataGeneratorOptions options, @Nonnull String schema) {
    checkNotNull(options, "options argument to createSink method cannot be null.");
    checkNotNull(schema, "schema argument to createSink method cannot be null.");

    switch (options.getSinkType()) {
      case PUBSUB:
        checkArgument(
            options.getTopic() != null,
            String.format(
                "Missing required value --topic for %s sink type", options.getSinkType().name()));
        return StreamingDataGeneratorWriteToPubSub.Writer.builder(options, schema).build();
      case BIGQUERY:
        checkArgument(
            options.getOutputTableSpec() != null,
            String.format(
                "Missing required value --outputTableSpec in format"
                    + " <project>:<dataset>.<table_name> for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToBigQuery.builder(options).build();
      case GCS:
        checkArgument(
            options.getOutputDirectory() != null,
            String.format(
                "Missing required value --outputDirectory in format gs:// for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToGcs.builder(options).build();
      case JDBC:
        checkArgument(
            options.getDriverClassName() != null,
            String.format(
                "Missing required value --driverClassName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getConnectionUrl() != null,
            String.format(
                "Missing required value --connectionUrl for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getStatement() != null,
            String.format(
                "Missing required value --statement for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToJdbc.builder(options).build();
      case SPANNER:
        checkArgument(
            options.getProjectId() != null,
            String.format(
                "Missing required value --projectId for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerInstanceName() != null,
            String.format(
                "Missing required value --spannerInstanceName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerDatabaseName() != null,
            String.format(
                "Missing required value --spannerDatabaseName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerTableName() != null,
            String.format(
                "Missing required value --spannerTableName for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToSpanner.builder(options).build();
      case KAFKA:
        checkArgument(
            options.getBootstrapServer() != null,
            String.format(
                "Missing required value --bootstrapServer for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getKafkaTopic() != null,
            String.format(
                "Missing required value --kafkaTopic for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToKafka.Writer.builder(options).build();
      default:
        throw new IllegalArgumentException("Unsupported Sink.");
    }
  }

  private static String getSchema(SchemaTemplate schemaTemplate, String schemaLocation) {
    checkArgument(
        schemaTemplate != null || schemaLocation != null,
        "Either schemaTemplate or schemaLocation argument of MessageGeneratorFn class must be"
            + " provided.");
    if (schemaLocation != null) {
      return GCSUtils.getGcsFileAsString(schemaLocation);
    } else {
      return schemaTemplate.getSchema();
    }
  }
}

Nächste Schritte