Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage 模板

Streaming Data Generator 模板用于根据用户提供架构以指定的速率生成不限数量或固定数量的综合记录或消息。 兼容的目标包括 Pub/Sub 主题、BigQuery 表和 Cloud Storage 存储桶。

以下是一些可能的使用场景:

  • 模拟针对 Pub/Sub 主题的大规模实时事件发布,以衡量并确定处理发布事件所需的消费者数量和规模。
  • 生成发送到 BigQuery 表或 Cloud Storage 存储桶的综合数据,以评估性能基准或用作概念验证。

支持的接收器和编码格式

下表说明了此模板支持的接收器和编码格式:
JSON Avro Parquet
Pub/Sub
BigQuery
Cloud Storage

流水线要求

  • 工作器服务账号需要分配 Dataflow Worker (roles/dataflow.worker) 角色。如需了解详情,请参阅预留简介
  • 创建一个包含所生成数据的 JSON 模板的架构文件。此模板使用 JSON 数据生成器库,因此您可以为架构中的每个字段提供各种 faker 函数。如需了解详情,请参阅 json-data-generator 文档

    例如:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • 将架构文件上传到 Cloud Storage 存储桶。
  • 输出目标必须已存在才能执行此流水线。目标必须是 Pub/Sub 主题、BigQuery 表或 Cloud Storage 存储桶,具体取决于接收器类型。
  • 如果输出编码是 Avro 或 Parquet,请创建一个 Avro 架构文件并将其存储在 Cloud Storage 位置。
  • 根据所需的目标位置,为工作器服务账号分配额外的 IAM 角色。
    目标 额外需要的 IAM 角色 应用于哪个资源
    Pub/Sub Pub/Sub Publisher (roles/pubsub.publisher)
    (如需了解详情,请参阅使用 IAM 进行 Pub/Sub 访问权限控制
    Pub/Sub 主题
    BigQuery BigQuery Data Editor (roles/bigquery.dataEditor)
    (如需了解详情,请参阅使用 IAM 进行 BigQuery 访问权限控制
    BigQuery 数据集
    Cloud Storage Cloud Storage Object Admin (roles/storage.objectAdmin)
    (如需了解详情,请参阅使用 IAM 进行 Cloud Storage 访问权限控制
    Cloud Storage 存储桶

模板参数

参数 说明
schemaLocation 架构文件的位置。例如:gs://mybucket/filename.json
qps 每秒要发布的消息数。例如:100
sinkType (可选)输出接收器类型。可能的值有 PUBSUBBIGQUERYGCS。默认值为 PUBSUB。
outputType (可选)输出编码类型。可能的值有 JSONAVROPARQUET。默认值为 JSON。
avroSchemaLocation (可选)AVRO 架构文件的位置。outputType 为 AVRO 或 PARQUET 时必须提供此参数。例如:gs://mybucket/filename.avsc
topic (可选)流水线应向其发布数据的 Pub/Sub 主题的名称。sinkType 为 Pub/Sub 时必须提供此参数。例如:projects/my-project-id/topics/my-topic-id
outputTableSpec (可选)输出 BigQuery 表的名称。sinkType 为 BigQuery 时必须提供此参数。例如:my-project-ID:my_dataset_name.my-table-name
writeDisposition (可选)BigQuery 写入处置方式。可能的值有 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值为 WRITE_APPEND。
outputDeadletterTable (可选)保存失败记录的输出 BigQuery 表的名称。如果未提供,流水线在执行期间会创建名为 {output_table_name}_error_records 的表。例如:my-project-ID:my_dataset_name.my-table-name
outputDirectory (可选)输出 Cloud Storage 位置的路径。sinkType 为 Cloud Storage 时必须提供此参数。例如:gs://mybucket/pathprefix/
outputFilenamePrefix (可选)写入 Cloud Storage 的输出文件的文件名前缀。默认值为 output-。
windowDuration (可选)输出写入 Cloud Storage 的时段间隔。默认值为 1m(即 1 分钟)。
numShards [可选] 输出分片的数量上限。sinkType 为 Cloud Storage 时必须提供此参数,并且此参数应设置为 1 或更大的数。
messagesLimit (可选)输出消息的数量上限。默认值为 0,表示无限制。
autoscalingAlgorithm (可选)用于自动扩缩工作器的算法。可能的值为 THROUGHPUT_BASED(启用自动扩缩)或 NONE(停用)。
maxNumWorkers (可选)工作器机器数上限。例如:10

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Streaming Data Generator template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • SCHEMA_LOCATION:Cloud Storage 中架构文件的路径。例如:gs://mybucket/filename.json
  • QPS:每秒要发布的消息数
  • PUBSUB_TOPIC:输出 Pub/Sub 主题。例如:projects/my-project-id/topics/my-topic-id

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • SCHEMA_LOCATION:Cloud Storage 中架构文件的路径。例如:gs://mybucket/filename.json
  • QPS:每秒要发布的消息数
  • PUBSUB_TOPIC:输出 Pub/Sub 主题。例如:projects/my-project-id/topics/my-topic-id

后续步骤