Pub/Sub to Avro Files on Cloud Storage 模板

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储桶。

流水线要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 要用于归档输出 Avro 文件的输出目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
outputFilenamePrefix (可选)Avro 文件的输出文件名前缀。
outputFilenameSuffix (可选)Avro 文件的输出文件名后缀。
outputShardTemplate (可选)输出文件的分片模板。它被指定为字母 SN 的重复序列。例如 SSS-NNN。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

后续步骤