Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。
流水线要求
- 用作输入来源的 Pub/Sub 订阅必须存在。
- Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
- 未处理的 Pub/Sub 主题必须存在。
- 用作输出目标的 BigQuery 数据集必须已存在。
模板参数
必需参数
- schemaPath:Avro 架构文件的 Cloud Storage 位置。例如
gs://path/to/my/schema.avsc
。 - inputSubscription:要从中读取数据的 Pub/Sub 输入订阅。例如
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
。 - outputTableSpec:要将输出写入的 BigQuery 输出表位置。例如
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
。根据指定的createDisposition
,系统可能会使用用户提供的 Avro 架构自动创建输出表。 - outputTopic:要用于未处理的记录的 Pub/Sub 主题。例如
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
。
可选参数
- useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为 true。如需使用“正好一次”语义,请将此参数设置为
false
。仅当useStorageWriteApi
为true
时,此参数才适用。默认值为false
。 - writeDisposition:BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) 值。例如
WRITE_APPEND
、WRITE_EMPTY
或WRITE_TRUNCATE
。默认值为WRITE_APPEND
。 - createDisposition:BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。例如
CREATE_IF_NEEDED
和CREATE_NEVER
。 默认值为CREATE_IF_NEEDED
。 - useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为
false
。如需了解详情,请参阅“使用 Storage Write API”(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。 - numStorageWriteApiStreams:使用 Storage Write API 时,指定写入流的数量。如果
useStorageWriteApi
为true
且useStorageWriteApiAtLeastOnce
为false
,则必须设置此参数。默认值为 0。 - storageWriteApiTriggeringFrequencySec:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果
useStorageWriteApi
为true
且useStorageWriteApiAtLeastOnce
为false
,则必须设置此参数。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Avro to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
SCHEMA_PATH
:Avro 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称DEADLETTER_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
SCHEMA_PATH
:Avro 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称DEADLETTER_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。