Text Files on Cloud Storage to Pub/Sub (Stream) 模板

此模板可以创建一种流处理流水线,该流水线可持续轮询新上传到 Cloud Storage 的文本文件,并逐行读取每个文件,然后将字符串发布到 Pub/Sub 主题。此外,此模板还能以包含 JSON 记录的换行符分隔文件或 CSV 文件形式,将记录发布到 Pub/Sub 主题进行实时处理。您可以使用此模板将数据重放到 Pub/Sub。

流水线无限期运行,需要通过“取消”而非“排空”手动终止,原因是其使用“Watch”转换,该转换是一个不支持排空的“SplittableDoFn”。

目前,轮询间隔固定为 10 秒。此模板不会对个别记录设置任何时间戳,因此在执行期间,事件时间与发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,您不应使用此流水线。

流水线要求

  • 输入文件必须采用换行符分隔的 JSON 或 CSV 格式。跨越源文件中多个行的记录会导致下游问题,因为文件中的每一行都以消息形式发布到 Pub/Sub。
  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 流水线无限期运行,需要手动终止。

模板参数

必需参数

  • inputFilePattern:要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json
  • outputTopic:要向其写入数据的 Pub/Sub 输入主题。名称必须采用 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 格式。例如 projects/your-project-id/topics/your-topic-name

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
  8. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FILE_PATTERN:要从 Cloud Storage 存储桶中读取的文件格式 glob(例如 path/*.csv

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FILE_PATTERN:要从 Cloud Storage 存储桶中读取的文件格式 glob(例如 path/*.csv

后续步骤