Google 提供了一组开源 Dataflow 模板。如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板。
本指南介绍了实用程序模板。
文件格式转换(Avro、Parquet、CSV)
文件格式转换模板是批处理流水线,用于将 Cloud Storage 中存储的文件从一种受支持的格式转换为另一种格式。
支持以下格式转换:
- CSV 到 Avro
- CSV 到 Parquet
- Avro 到 Parquet
- Parquet 到 Avro
对此流水线的要求:
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
inputFileFormat |
输入文件格式。必须为 [csv, avro, parquet] 之一。 |
outputFileFormat |
输出文件格式。必须为 [avro, parquet] 之一。 |
inputFileSpec |
输入文件的 Cloud Storage 路径模式。例如 gs://bucket-name/path/*.csv 。 |
outputBucket |
用于写入输出文件的 Cloud Storage 文件夹。此路径应以斜杠结尾。例如 gs://bucket-name/output/ 。 |
schema |
Avro 架构文件的 Cloud Storage 路径(例如 gs://bucket-name/schema/my-schema.avsc ) |
containsHeaders |
(可选)输入 CSV 文件包含标题记录 (true/false)。默认值为 false 。仅在读取 CSV 文件时才需要。 |
csvFormat |
(可选)用于解析记录的 CSV 格式规范。默认值为 Default 。
如需了解详情,请参阅 Apache Commons CSV 格式。 |
delimiter |
(可选)输入 CSV 文件使用的字段分隔符。 |
outputFilePrefix |
(可选)输出文件前缀。默认值为 output 。 |
numShards |
(可选)输出文件分片数。 |
运行文件格式转换模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Convert file formats template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \ --parameters \ inputFileFormat=INPUT_FORMAT,\ outputFileFormat=OUTPUT_FORMAT,\ inputFileSpec=INPUT_FILES,\ schema=SCHEMA,\ outputBucket=OUTPUT_FOLDER
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_FORMAT
:输入文件的文件格式;必须为[csv, avro, parquet]
中的一个OUTPUT_FORMAT
:输出文件的文件格式;必须为[avro, parquet]
中的一个INPUT_FILES
:输入文件的路径模式OUTPUT_FOLDER
:输出文件的 Cloud Storage 文件夹SCHEMA
:Avro 架构文件的路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileFormat": "INPUT_FORMAT", "outputFileFormat": "OUTPUT_FORMAT", "inputFileSpec": "INPUT_FILES", "schema": "SCHEMA", "outputBucket": "OUTPUT_FOLDER" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_FORMAT
:输入文件的文件格式;必须为[csv, avro, parquet]
中的一个OUTPUT_FORMAT
:输出文件的文件格式;必须为[avro, parquet]
中的一个INPUT_FILES
:输入文件的路径模式OUTPUT_FOLDER
:输出文件的 Cloud Storage 文件夹SCHEMA
:Avro 架构文件的路径
Bulk Compress Cloud Storage Files
Bulk Compress Cloud Storage Files 模板是一种批处理流水线,用于将 Cloud Storage 上的文件压缩到指定位置。如果您需要在定期归档过程中压缩大批量文件,此模板非常有用。支持的压缩模式包括 BZIP2
、DEFLATE
、GZIP
。输出到目标位置的文件将遵循原始文件名附加压缩模式扩展名这一命名架构。可以附加以下扩展名之一:.bzip2
、.deflate
、.gz
。
对此流水线的要求:
- 必须采用以下压缩格式之一:
BZIP2
、DEFLATE
、GZIP
。 - 在运行流水线之前,必须已存在输出目录。
模板参数
参数 | 说明 |
---|---|
inputFilePattern |
要从中读取数据的输入文件模式。例如 gs://bucket-name/uncompressed/*.txt 。 |
outputDirectory |
要向其中写入内容的输出位置。例如 gs://bucket-name/compressed/ 。 |
outputFailureFile |
错误日志输出文件,用于写入在压缩过程中发生的故障。例如 gs://bucket-name/compressed/failed.csv 。如果没有发生故障,则系统仍会创建该文件,但其中不会包含任何内容。文件内容采用“文件名 + 错误”的 CSV 格式,每一行对应一个压缩失败的文件。 |
compression |
用于压缩匹配文件的压缩算法。必须为以下项之一:BZIP2 、DEFLATE 、GZIP |
运行 Bulk Compress Cloud Storage Files 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bulk Compress Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\ outputDirectory=gs://BUCKET_NAME/compressed,\ outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\ compression=COMPRESSION
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
- Cloud Storage 存储桶的名称。COMPRESSION
:您选择的压缩算法
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt", "outputDirectory": "gs://BUCKET_NAME/compressed", "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv", "compression": "COMPRESSION" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
- Cloud Storage 存储桶的名称。COMPRESSION
:您选择的压缩算法
Bulk Decompress Cloud Storage Files
Bulk Decompress Cloud Storage Files 模板是一种批处理流水线,用于将 Cloud Storage 上的文件解压缩到指定位置。此功能适用于以下情况:您希望在迁移过程中使用压缩数据,以最大限度降低网络带宽费用,但在迁移后使用非压缩数据,以最大限度提高分析处理速度。此流水线会在一次运行期间自动处理多种压缩模式,并根据文件扩展名(.bzip2
、.deflate
、.gz
、.zip
)确定要使用的解压缩模式。
对此流水线的要求:
- 需要解压缩的文件必须是以下格式之一:
Bzip2
、Deflate
、Gzip
、Zip
。 - 在运行流水线之前,必须已存在输出目录。
模板参数
参数 | 说明 |
---|---|
inputFilePattern |
要从中读取数据的输入文件模式。例如 gs://bucket-name/compressed/*.gz 。 |
outputDirectory |
要向其中写入内容的输出位置。例如 gs://bucket-name/decompressed 。 |
outputFailureFile |
错误日志输出文件,用于写入在解压缩过程中发生的故障。例如 gs://bucket-name/decompressed/failed.csv 。如果没有发生故障,则系统仍会创建该文件,但其中不会包含任何内容。文件内容采用 CSV 格式(文件名, 错误),并且每行对应一个解压缩失败的文件。 |
运行 Bulk Decompress Cloud Storage Files 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bulk Decompress Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\ outputDirectory=gs://BUCKET_NAME/decompressed,\ outputFailureFile=OUTPUT_FAILURE_FILE_PATH
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
- Cloud Storage 存储桶的名称。OUTPUT_FAILURE_FILE_PATH
:您选择的包含失败信息的文件的路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz", "outputDirectory": "gs://BUCKET_NAME/decompressed", "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
- Cloud Storage 存储桶的名称。OUTPUT_FAILURE_FILE_PATH
:您选择的包含失败信息的文件的路径
Datastore Bulk Delete [已弃用]
此模板已弃用,将于 2022 年第一季度移除。请迁移到 Firestore Bulk Delete 模板。
Datastore Bulk Delete 模板是一种流水线,它使用给定的 GQL 查询从 Datastore 中读取实体,然后删除所选目标项目中的所有匹配实体。此流水线可选择性地将 JSON 编码的 Datastore 实体传递给您的 JavaScript UDF,使您可以通过返回 null 值来过滤掉实体。
对此流水线的要求:
- 必须先在项目中设置 Datastore,然后才能运行模板。
- 如果从单独的 Datastore 实例中读取和删除,则 Dataflow 工作器服务帐号必须具有从一个实例读取并从另一个实例中删除的权限。
模板参数
参数 | 说明 |
---|---|
datastoreReadGqlQuery |
GQL 查询,指定要删除的匹配实体。使用仅限于键的查询可以提高性能。例如:“SELECT __key__ FROM MyKind”。 |
datastoreReadProjectId |
您要从中读取用于匹配的实体(使用 GQL 查询)的 Datastore 实例的项目 ID。 |
datastoreDeleteProjectId |
要从中删除匹配的实体的 Datastore 实例的项目 ID。如果您希望在同一个 Datastore 实例中执行读取和删除操作,则此 ID 可以与 datastoreReadProjectId 相同。 |
datastoreReadNamespace |
(可选)所请求实体的命名空间。默认名称空间设置为“”。 |
datastoreHintNumWorkers |
(可选)Datastore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。如果此函数为给定 Datastore 实体返回未定义的值或 null 值,则该实体不会被删除。 |
运行 Datastore Bulk Delete 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bulk Delete Entities in Datastore template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \ --region REGION_NAME \ --parameters \ datastoreReadGqlQuery="GQL_QUERY",\ datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\ datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
GQL_QUERY
:用于匹配要删除的实体的查询DATASTORE_READ_AND_DELETE_PROJECT_ID
:您的 Datastore 实例项目 ID。此示例从同一个 Datastore 实例中读取和删除。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete { "jobName": "JOB_NAME", "parameters": { "datastoreReadGqlQuery": "GQL_QUERY", "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID", "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
GQL_QUERY
:用于匹配要删除的实体的查询DATASTORE_READ_AND_DELETE_PROJECT_ID
:您的 Datastore 实例项目 ID。此示例从同一个 Datastore 实例中读取和删除。
Firestore Bulk Delete
Firestore Bulk Delete 模板是一种流水线,它使用给定的 GQL 查询从 Firestore 中读取实体,然后删除所选目标项目中的所有匹配实体。此流水线可选择性地将 JSON 编码的 Firestore 实体传递给您的 JavaScript UDF,使您可以通过返回 null 值来过滤掉实体。
对此流水线的要求:
- 必须先在项目中设置 Firestore,然后才能运行模板。
- 如果从单独的 Firestore 实例中读取和删除,则 Dataflow 工作器服务帐号必须具有从一个实例读取并从另一个实例中删除的权限。
模板参数
参数 | 说明 |
---|---|
firestoreReadGqlQuery |
GQL 查询,指定要删除的匹配实体。使用仅限于键的查询可以提高性能。例如:“SELECT __key__ FROM MyKind”。 |
firestoreReadProjectId |
您要从中读取用于匹配的实体(使用 GQL 查询)的 Firestore 实例的项目 ID。 |
firestoreDeleteProjectId |
要从中删除匹配的实体的 Firestore 实例的项目 ID。如果您希望在同一个 Firestore 实例中执行读取和删除操作,则此 ID 可以与 firestoreReadProjectId 相同。 |
firestoreReadNamespace |
(可选)所请求实体的命名空间。默认名称空间设置为“”。 |
firestoreHintNumWorkers |
(可选)Firestore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。如果此函数为给定 Firestore 实体返回未定义的值或 null 值,则该实体不会被删除。 |
运行 Firestore Bulk Delete 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Bulk Delete Entities in Firestore template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \ --region REGION_NAME \ --parameters \ firestoreReadGqlQuery="GQL_QUERY",\ firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\ firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
GQL_QUERY
:用于匹配要删除的实体的查询FIRESTORE_READ_AND_DELETE_PROJECT_ID
:您的 Firestore 实例项目 ID。此示例从同一个 Firestore 实例中读取和删除。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete { "jobName": "JOB_NAME", "parameters": { "firestoreReadGqlQuery": "GQL_QUERY", "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID", "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID" }, "environment": { "zone": "us-central1-f" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
GQL_QUERY
:用于匹配要删除的实体的查询FIRESTORE_READ_AND_DELETE_PROJECT_ID
:您的 Firestore 实例项目 ID。此示例从同一个 Firestore 实例中读取和删除。
Streaming Data Generator 到 Pub/Sub、BigQuery 和 Cloud Storage
Streaming Data Generator 模板用于根据用户提供架构以指定的速率生成不限数量或固定数量的综合记录或消息。 兼容的目标包括 Pub/Sub 主题、BigQuery 表和 Cloud Storage 存储桶。
以下是一些可能的使用场景:
- 模拟针对 Pub/Sub 主题的大规模实时事件发布,以衡量并确定处理发布事件所需的消费者数量和规模。
- 生成发送到 BigQuery 表或 Cloud Storage 存储桶的综合数据,以评估性能基准或用作概念验证。
支持的接收器和编码格式
下表说明了此模板支持的接收器和编码格式:JSON | Avro | Parquet | |
---|---|---|---|
Pub/Sub | 是 | 是 | 否 |
BigQuery | 是 | 否 | 否 |
Cloud Storage | 是 | 是 | 是 |
对此流水线的要求:
创建一个架构文件,其中包含所生成数据的 JSON 模板。此模板使用 JSON 数据生成器库,因此您可以为架构中的每个字段提供各种 faker 函数。如需了解详情,请参阅 json-data-generator 文档。
例如:
{ "id": {{integer(0,1000)}}, "name": "{{uuid()}}", "isInStock": {{bool()}} }
- 将架构文件上传到 Cloud Storage 存储桶。
- 输出目标必须已存在才能执行此流水线。目标必须是 Pub/Sub 主题、BigQuery 表或 Cloud Storage 存储桶,具体取决于接收器类型。
- 如果输出编码是 Avro 或 Parquet,请创建一个 Avro 架构文件并将其存储在 Cloud Storage 位置。
模板参数
参数 | 说明 |
---|---|
schemaLocation |
架构文件的位置。例如:gs://mybucket/filename.json 。 |
qps |
每秒要发布的消息数。例如:100 。 |
sinkType |
(可选)输出接收器类型。可能的值有 PUBSUB 、BIGQUERY 、GCS 。默认值为 PUBSUB。 |
outputType |
(可选)输出编码类型。可能的值有 JSON 、AVRO 、PARQUET 。默认值为 JSON。 |
avroSchemaLocation |
(可选)AVRO 架构文件的位置。outputType 为 AVRO 或 PARQUET 时必须提供此参数。例如:gs://mybucket/filename.avsc 。 |
topic |
(可选)流水线应向其发布数据的 Pub/Sub 主题的名称。sinkType 为 Pub/Sub 时必须提供此参数。例如: 。 |
outputTableSpec |
(可选)输出 BigQuery 表的名称。sinkType 为 BigQuery 时必须提供此参数。例如: 。 |
writeDisposition |
(可选)BigQuery 写入处置方式。可能的值有 WRITE_APPEND 、WRITE_EMPTY 或 WRITE_TRUNCATE 。默认值为 WRITE_APPEND。 |
outputDeadletterTable |
(可选)保存失败记录的输出 BigQuery 表的名称。如果未提供,流水线在执行期间会创建名为 {output_table_name}_error_records 的表。例如: 。 |
outputDirectory |
(可选)输出 Cloud Storage 位置的路径。sinkType 为 Cloud Storage 时必须提供此参数。例如:gs://mybucket/pathprefix/ 。 |
outputFilenamePrefix |
(可选)写入 Cloud Storage 的输出文件的文件名前缀。默认值为 output-。 |
windowDuration |
(可选)输出写入 Cloud Storage 的时段间隔。默认值为 1m(即 1 分钟)。 |
numShards |
[可选] 输出分片的数量上限。sinkType 为 Cloud Storage 时必须提供此参数,并且此参数应设置为 1 或更大的数。 |
messagesLimit |
(可选)输出消息的数量上限。默认值为 0,表示无限制。 |
autoscalingAlgorithm |
(可选)用于自动扩缩工作器的算法。可能的值为 THROUGHPUT_BASED (启用自动扩缩)或 NONE (停用)。 |
maxNumWorkers |
(可选)工作器机器数上限。例如:10 。 |
运行 Streaming Data Generator 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Streaming Data Generator template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \ --parameters \ schemaLocation=SCHEMA_LOCATION,\ qps=QPS,\ topic=PUBSUB_TOPIC
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDREGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_LOCATION
:Cloud Storage 中架构文件的路径。例如:gs://mybucket/filename.json
。QPS
:每秒要发布的消息数PUBSUB_TOPIC
:输出 Pub/Sub 主题。例如:projects/my-project-ID/topics/my-topic-ID
。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "schemaLocation": "SCHEMA_LOCATION", "qps": "QPS", "topic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDLOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_LOCATION
:Cloud Storage 中架构文件的路径。例如:gs://mybucket/filename.json
。QPS
:每秒要发布的消息数PUBSUB_TOPIC
:输出 Pub/Sub 主题。例如:projects/my-project-ID/topics/my-topic-ID
。