Pub/Sub Topic or Subscription to Text Files on Cloud Storage 模板

Pub/Sub Topic or Subscription to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

流水线要求

  • Pub/Sub 主题或订阅必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

必需参数

  • outputDirectory:用于写入输出文件的路径和文件名前缀。该值必须以斜杠结尾。 (示例:gs://your-bucket/your-path)。

可选参数

  • inputTopic:要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 格式。如果提供了此参数,请勿使用 inputSubscription。(示例:projects/your-project-id/topics/your-topic-name)。
  • inputSubscription:要从中读取输入的 Pub/Sub 订阅。订阅名称采用 projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME> 格式。如果提供了此参数,请勿使用 inputTopic。(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • userTempLocation:临时文件输出到的用户提供的目录。必须以斜杠结尾。
  • outputFilenamePrefix:要在各窗口文件上放置的前缀。(示例:output-)。默认值为:output。
  • outputFilenameSuffix:要在各窗口文件上放置的后缀,通常是文件扩展名,例如 .txt.csv。(示例:.txt)。默认值为空。
  • outputShardTemplate:分片式模板定义每个窗口文件的动态部分。默认情况下,该流水线使用单一分片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01
  • numShards:写入时生成的输出分片数上限。分片数越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。默认值为 0。
  • windowDuration:窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认值为 5m(5 分钟),最少为 1s(1 秒)。允许的格式为 [int](以秒为单位,例如 5s)、[int]m(以分钟为单位,例如 12m)、[int]h(以小时为单位,例如 2h)。 (示例:5m)。
  • yearPattern:用于设置年份格式的模式。必须是一个或多个“y”或“Y”。大小写在年份中没有区别。该模式可以视情况用非字母数字字符或非目录 ('/') 字符括起来。默认值为“YYYY”。
  • monthPattern:用于设置月份格式的模式。必须是一个或多个“M”字符。该模式可以视情况用非字母数字字符或非目录 ('/') 字符括起来。默认值为“MM”。
  • dayPattern:用于设置某天的模式。必须是一个或多个“d”(表示一个月中的某一天)或“D”(表示一年中的某一天)。大小写在年份中没有区别。该模式可以视情况用非字母数字字符或非目录 ('/') 字符括起来。默认值为“dd”。
  • hourPattern:用于设置小时格式的模式。必须是一个或多个“H”字符。该模式可以视情况用非字母数字字符或非目录 ('/') 字符括起来。默认值为“HH”。
  • minutePattern:用于设置分钟格式的模式。必须是一个或多个“m”字符。该模式可以视情况用非字母数字字符或非目录 ('/') 字符括起来。默认值为“mm”。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

后续步骤