Google 提供了一组开源 Dataflow 模板。
这些 Dataflow 模板可帮助您处理大型数据任务,包括数据导入、数据导出、数据备份、数据恢复和批量 API 操作,所有这些均无需使用专用开发环境。这些模板基于 Apache Beam 构建,并使用 Dataflow 转换数据。
如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板。本指南介绍了批量模板。
BigQuery to Cloud Storage TFRecords
BigQuery to Cloud Storage TFRecords 模板是一种流水线,可从 BigQuery 查询读取数据并以 TFRecord 格式将其写入 Cloud Storage 存储桶。您可以指定训练、测试和验证拆分百分比。默认情况下,训练集的拆分比例为 1 或 100%,测试和验证集的拆分比例为 0 或 0%。设置数据集拆分比例时,训练、测试和验证之和加起来必须为 1 或 100%(例如,0.6 + 0.2 + 0.2)。Dataflow 会自动确定每个输出数据集的最佳分片数。
对此流水线的要求:
- BigQuery 数据集和表必须已存在。
- 输出 Cloud Storage 存储桶必须存在才能执行此流水线。训练、测试和验证子目录不需要预先存在,将会自动生成。
模板参数
参数 | 说明 |
---|---|
readQuery |
用于从来源中提取数据的 BigQuery SQL 查询。例如 select * from dataset1.sample_table 。 |
outputDirectory |
在其中写入训练、测试和验证 TFRecord 文件的顶级 Cloud Storage 路径前缀。例如 gs://mybucket/output 。生成的训练、测试和验证 TFRecord 文件的子目录根据 outputDirectory 自动生成。例如 gs://mybucket/output/train |
trainingPercentage |
(可选)分配给训练 TFRecord 文件的查询数据所占的百分比。默认值为 1 或 100%。 |
testingPercentage |
(可选)分配给测试 TFRecord 文件的查询数据所占的百分比。 默认值为 0 或 0%。 |
validationPercentage |
(可选)分配给验证 TFRecord 文件的查询数据所占的百分比。 默认值为 0 或 0%。 |
outputSuffix |
(可选)写入的训练、测试和验证 TFRecord 文件的文件后缀。默认值为 .tfrecord 。 |
运行 BigQuery to Cloud Storage TFRecord files 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the BigQuery to TFRecords template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records \ --region REGION_NAME \ --parameters \ readQuery=READ_QUERY,\ outputDirectory=OUTPUT_DIRECTORY,\ trainingPercentage=TRAINING_PERCENTAGE,\ testingPercentage=TESTING_PERCENTAGE,\ validationPercentage=VALIDATION_PERCENTAGE,\ outputSuffix=OUTPUT_FILENAME_SUFFIX
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
READ_QUERY
:要运行的 BigQuery 查询OUTPUT_DIRECTORY
:输出数据集的 Cloud Storage 路径前缀TRAINING_PERCENTAGE
:训练数据集的拆分小数百分比TESTING_PERCENTAGE
:测试数据集的拆分小数百分比VALIDATION_PERCENTAGE
:验证数据集的拆分小数百分比OUTPUT_FILENAME_SUFFIX
:首选输出 TensorFlow 记录文件后缀
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records { "jobName": "JOB_NAME", "parameters": { "readQuery":"READ_QUERY", "outputDirectory":"OUTPUT_DIRECTORY", "trainingPercentage":"TRAINING_PERCENTAGE", "testingPercentage":"TESTING_PERCENTAGE", "validationPercentage":"VALIDATION_PERCENTAGE", "outputSuffix":"OUTPUT_FILENAME_SUFFIX" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
READ_QUERY
:要运行的 BigQuery 查询OUTPUT_DIRECTORY
:输出数据集的 Cloud Storage 路径前缀TRAINING_PERCENTAGE
:训练数据集的拆分小数百分比TESTING_PERCENTAGE
:测试数据集的拆分小数百分比VALIDATION_PERCENTAGE
:验证数据集的拆分小数百分比OUTPUT_FILENAME_SUFFIX
:首选输出 TensorFlow 记录文件后缀
BigQuery export to Parquet(通过 Storage API)
BigQuery export to Parquet 模板是一种批处理流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储桶。此模板利用 BigQuery Storage API 导出数据。
对此流水线的要求:
- 在运行此流水线之前,输入 BigQuery 表必须已存在。
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
tableRef |
BigQuery 输入表位置。例如 <my-project>:<my-dataset>.<my-table> 。 |
bucket |
要在其中写入 Parquet 文件的 Cloud Storage 文件夹。例如 gs://mybucket/exports 。 |
numShards |
(可选)输出文件分片数。默认值为 1。 |
fields |
(可选)要从输入 BigQuery 表格选择的以英文逗号分隔的字段列表。 |
运行 BigQuery to Cloud Storage Parquet 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the BigQuery export to Parquet (via Storage API) template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet \ --region=REGION_NAME \ --parameters \ tableRef=BIGQUERY_TABLE,\ bucket=OUTPUT_DIRECTORY,\ numShards=NUM_SHARDS,\ fields=FIELDS
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGQUERY_TABLE
:您的 BigQuery 表名称OUTPUT_DIRECTORY
:输出文件的 Cloud Storage 文件夹NUM_SHARDS
:所需的输出文件分片数FIELDS
:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "tableRef": "BIGQUERY_TABLE", "bucket": "OUTPUT_DIRECTORY", "numShards": "NUM_SHARDS", "fields": "FIELDS" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGQUERY_TABLE
:您的 BigQuery 表名称OUTPUT_DIRECTORY
:输出文件的 Cloud Storage 文件夹NUM_SHARDS
:所需的输出文件分片数FIELDS
:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表
BigQuery to Elasticsearch
BigQuery to Elasticsearch 模板是一种批处理流水线,用于将 BigQuery 表中的数据作为文档注入到 Elasticsearch 中。该模板可以读取整个表,也可以使用提供的查询读取特定记录。
对此流水线的要求
- 源 BigQuery 表必须存在。
- 必须具有在 Google Cloud 实例上或 Elastic Cloud 上运行的使用 Elasticsearch 7.0 版或更高版本的 Elasticsearch 主机,并且该主机应能够从 Dataflow 工作器机器进行访问。
模板参数
参数 | 说明 |
---|---|
connectionUrl |
Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。 |
apiKey |
用于身份验证的 Base64 编码 API 密钥。 |
index |
将向其发出请求的 Elasticsearch 索引,例如 my-index 。 |
inputTableSpec |
(可选)要读取并插入到 Elasticsearch 中的 BigQuery 表。必须提供表或查询。例如 projectId:datasetId.tablename 。 |
query |
(可选)用于从 BigQuery 拉取数据的 SQL 查询。必须提供表或查询。 |
useLegacySql |
(可选)设置为 true 即可使用旧版 SQL(仅在提供查询时适用)。默认值:false 。 |
batchSize |
(可选)文档数量中的批次大小。默认值:1000 。 |
batchSizeBytes |
(可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。 |
maxRetryAttempts |
(可选)尝试次数上限,必须大于 0。默认值:no retries 。 |
maxRetryDuration |
(可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries 。 |
propertyAsIndex |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。 |
propertyAsId |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。 |
javaScriptIndexFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIndexFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIdFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptIdFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptTypeFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptTypeFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptIsDeleteFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
javaScriptIsDeleteFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
usePartialUpdate |
(可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false 。 |
bulkInsertMethod |
(可选)在 Elasticsearch 批量请求中使用 INDEX (索引,允许执行更新插入操作)还是 CREATE (创建,会对重复 _id 报错)。默认值:CREATE 。 |
运行 BigQuery to Elasticsearch 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the BigQuery to Elasticsearch template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 表名称。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 表名称。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。
BigQuery to MongoDB
BigQuery to MongoDB 模板是一种批处理流水线,可从 BigQuery 读取行数据并将其作为文档写入 MongoDB。目前,每一行数据都被存储为一个文档。
对此流水线的要求
- 源 BigQuery 表必须存在。
- 应该能够从 Dataflow 工作器机器访问目标 MongoDB 实例。
模板参数
参数 | 说明 |
---|---|
mongoDbUri |
MongoDB 连接 URI,格式为 mongodb+srv://:@ 。 |
database |
存储集合的 MongoDB 数据库。例如:my-db 。 |
collection |
MongoDB 数据库中集合的名称。例如:my-collection 。 |
inputTableSpec |
要读取的 BigQuery 表。例如 bigquery-project:dataset.input_table 。 |
运行 BigQuery to MongoDB 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the BigQuery to MongoDB template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 源表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 源表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。
Bigtable to Cloud Storage Avro
Bigtable to Cloud Storage Avro 模板是一种流水线,可从 Bigtable 表中读取数据并以 Avro 格式将其写入 Cloud Storage 存取分区。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。
对此流水线的要求:
- Bigtable 表必须已存在。
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
bigtableProjectId |
您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导出的 Bigtable 表的 ID。 |
outputDirectory |
写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder 。 |
filenamePrefix |
Avro 文件名的前缀。例如 output- 。 |
运行 Bigtable to Cloud Storage Avro file 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Avro Files on Cloud Storage template 。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro \ --region REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ outputDirectory=OUTPUT_DIRECTORY,\ filenamePrefix=FILENAME_PREFIX
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。OUTPUT_DIRECTORY
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:Avro 文件名的前缀,例如output-
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "outputDirectory": "OUTPUT_DIRECTORY", "filenamePrefix": "FILENAME_PREFIX", }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。OUTPUT_DIRECTORY
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:Avro 文件名的前缀,例如output-
Bigtable to Cloud Storage Parquet
Bigtable to Cloud Storage Parquet 模板是一种流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储桶。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。
对此流水线的要求:
- Bigtable 表必须已存在。
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
bigtableProjectId |
您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导出的 Bigtable 表的 ID。 |
outputDirectory |
写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder 。 |
filenamePrefix |
Parquet 文件名的前缀。例如 output- 。 |
numShards |
输出文件分片数。例如 2 。 |
运行 Bigtable to Cloud Storage Parquet file 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Parquet Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet \ --region REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ outputDirectory=OUTPUT_DIRECTORY,\ filenamePrefix=FILENAME_PREFIX,\ numShards=NUM_SHARDS
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。OUTPUT_DIRECTORY
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:Parquet 文件名的前缀,例如output-
NUM_SHARDS
:要输出的 Parquet 文件的数量,例如1
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "outputDirectory": "OUTPUT_DIRECTORY", "filenamePrefix": "FILENAME_PREFIX", "numShards": "NUM_SHARDS" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。OUTPUT_DIRECTORY
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:Parquet 文件名的前缀,例如output-
NUM_SHARDS
:要输出的 Parquet 文件的数量,例如1
Bigtable to Cloud Storage SequenceFile
Bigtable to Cloud Storage SequenceFile 模板是一种流水线,可从 Bigtable 表读取数据并以 SequenceFile 格式将其写入 Cloud Storage 存储桶。您可以使用该模板将数据从 Bigtable 复制到 Cloud Storage。
对此流水线的要求:
- Bigtable 表必须已存在。
- 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。
模板参数
参数 | 说明 |
---|---|
bigtableProject |
您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导出的 Bigtable 表的 ID。 |
bigtableAppProfileId |
要用于导出的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件。 |
destinationPath |
写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder 。 |
filenamePrefix |
SequenceFile 文件名的前缀。例如 output- 。 |
运行 Bigtable to Cloud Storage SequenceFile 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to SequenceFile Files on Cloud Storage template 。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile \ --region REGION_NAME \ --parameters \ bigtableProject=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ bigtableAppProfileId=APPLICATION_PROFILE_ID,\ destinationPath=DESTINATION_PATH,\ filenamePrefix=FILENAME_PREFIX
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。APPLICATION_PROFILE_ID
:将用于导出的 Bigtable 应用配置文件的 ID。DESTINATION_PATH
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:SequenceFile 文件名的前缀,例如output-
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile { "jobName": "JOB_NAME", "parameters": { "bigtableProject": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "bigtableAppProfileId": "APPLICATION_PROFILE_ID", "destinationPath": "DESTINATION_PATH", "filenamePrefix": "FILENAME_PREFIX", }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。APPLICATION_PROFILE_ID
:将用于导出的 Bigtable 应用配置文件的 ID。DESTINATION_PATH
:写入数据的 Cloud Storage 路径,例如gs://mybucket/somefolder
FILENAME_PREFIX
:SequenceFile 文件名的前缀,例如output-
Datastore to Cloud Storage Text [已弃用]
此模板已弃用,将于 2022 年第一季度移除。请迁移到 Firestore to Cloud Storage Text 模板。
Datastore to Cloud Storage Text 模板是一种批处理流水线,可读取 Datastore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。
对此流水线的要求:
在运行此流水线之前,必须先在项目中设置 Datastore。
模板参数
参数 | 说明 |
---|---|
datastoreReadGqlQuery |
一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind 。 |
datastoreReadProjectId |
您要从中读取数据的 Datastore 实例的 Google Cloud 项目 ID。 |
datastoreReadNamespace |
所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
textWritePrefix |
Cloud Storage 路径前缀,用于指定写入数据的位置。例如 gs://mybucket/somefolder/ 。 |
运行 Datastore to Cloud Storage Text 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Datastore to Text Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Datastore_to_GCS_Text \ --region REGION_NAME \ --parameters \ datastoreReadGqlQuery="SELECT * FROM DATASTORE_KIND",\ datastoreReadProjectId=DATASTORE_PROJECT_ID,\ datastoreReadNamespace=DATASTORE_NAMESPACE,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ textWritePrefix=gs://BUCKET_NAME/output/
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
:Cloud Storage 存储桶的名称DATASTORE_PROJECT_ID
:Datastore 实例所在的 Cloud 项目的 IDDATASTORE_KIND
:您的 Datastore 实体的类型DATASTORE_NAMESPACE
:Datastore 实体的命名空间JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_GCS_Text { "jobName": "JOB_NAME", "parameters": { "datastoreReadGqlQuery": "SELECT * FROM DATASTORE_KIND" "datastoreReadProjectId": "DATASTORE_PROJECT_ID", "datastoreReadNamespace": "DATASTORE_NAMESPACE", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "textWritePrefix": "gs://BUCKET_NAME/output/" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
:Cloud Storage 存储桶的名称DATASTORE_PROJECT_ID
:Datastore 实例所在的 Cloud 项目的 IDDATASTORE_KIND
:您的 Datastore 实体的类型DATASTORE_NAMESPACE
:Datastore 实体的命名空间JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
Firestore to Cloud Storage Text
Firestore to Cloud Storage Text 模板是一种批处理流水线,可读取 Firestore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。
对此流水线的要求:
在运行此流水线之前,必须先在项目中设置 Firestore。
模板参数
参数 | 说明 |
---|---|
firestoreReadGqlQuery |
一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind 。 |
firestoreReadProjectId |
您要从中读取数据的 Firestore 实例的 Google Cloud 项目 ID。 |
firestoreReadNamespace |
所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
textWritePrefix |
Cloud Storage 路径前缀,用于指定写入数据的位置。例如 gs://mybucket/somefolder/ 。 |
运行 Firestore to Cloud Storage Text 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Firestore to Text Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Firestore_to_GCS_Text \ --region REGION_NAME \ --parameters \ firestoreReadGqlQuery="SELECT * FROM FIRESTORE_KIND",\ firestoreReadProjectId=FIRESTORE_PROJECT_ID,\ firestoreReadNamespace=FIRESTORE_NAMESPACE,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ textWritePrefix=gs://BUCKET_NAME/output/
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
:Cloud Storage 存储桶的名称FIRESTORE_PROJECT_ID
:Firestore 实例所在的 Cloud 项目的 IDFIRESTORE_KIND
:Firestore 实体的类型FIRESTORE_NAMESPACE
:Firestore 实体的命名空间JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_GCS_Text { "jobName": "JOB_NAME", "parameters": { "firestoreReadGqlQuery": "SELECT * FROM FIRESTORE_KIND" "firestoreReadProjectId": "FIRESTORE_PROJECT_ID", "firestoreReadNamespace": "FIRESTORE_NAMESPACE", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "textWritePrefix": "gs://BUCKET_NAME/output/" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
BUCKET_NAME
:Cloud Storage 存储桶的名称FIRESTORE_PROJECT_ID
:Firestore 实例所在的 Cloud 项目的 IDFIRESTORE_KIND
:Firestore 实体的类型FIRESTORE_NAMESPACE
:Firestore 实体的命名空间JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
Cloud Spanner to Cloud Storage Avro
Cloud Spanner to Cloud Storage Avro on Cloud Storage 模板是一种批处理流水线,可将整个 Cloud Spanner 数据库以 Avro 格式导出到 Cloud Storage。导出 Cloud Spanner 数据库会在您选择的存储桶中创建一个文件夹。该文件夹包含以下内容:
spanner-export.json
文件。- 您导出的数据库中每个表的
TableName-manifest.json
文件。 - 一个或多个
TableName.avro-#####-of-#####
文件。
例如,如果导出包含两个表 Singers
和 Albums
的数据库,则系统会创建以下文件集:
Albums-manifest.json
Albums.avro-00000-of-00002
Albums.avro-00001-of-00002
Singers-manifest.json
Singers.avro-00000-of-00003
Singers.avro-00001-of-00003
Singers.avro-00002-of-00003
spanner-export.json
对此流水线的要求:
- Cloud Spanner 数据库必须存在。
- Cloud Storage 输出存储桶必须存在。
- 除了运行 Dataflow 作业所需的 IAM 角色之外,您还必须具有适当的 IAM 角色才能读取 Cloud Spanner 数据并写入 Cloud Storage 存储桶。
模板参数
参数 | 说明 | |
---|---|---|
instanceId |
需要导出的 Cloud Spanner 数据库的实例 ID。 | |
databaseId |
需要导出的 Cloud Spanner 数据库的数据库 ID。 | |
outputDir |
您期望的 Avro 文件导出位置的 Cloud Storage 路径。导出作业在此路径下创建一个包含导出文件的新目录。 | |
snapshotTime |
(可选)与您要读取的 Cloud Spanner 数据库版本对应的时间戳。时间戳必须按照 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式指定。例如 1990-12-31T23:59:60Z 。时间戳必须是过去的时间,并且必须遵循时间戳过时上限。 |
|
tableNames |
(可选)英文逗号分隔列表,指定要导出的 Cloud Spanner 数据库子集。该列表必须包含所有相关表(父表、外键引用的表)。如果未明确列出,则必须设置“应该导出相关表”标志,才能成功导出。 | |
shouldExportRelatedTables |
(可选)与“tableNames”参数结合使用的标志,用于包括要导出的所有相关表。 | |
spannerProjectId |
(可选)您要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。 |
在 Cloud Storage 模板上运行 Cloud Spanner to Avro 文件
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
作业名称必须与以下格式匹配,作业才会显示在 Google Cloud 控制台的 Spanner 实例页面中:
cloud-spanner-export-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME
替换以下内容:
SPANNER_INSTANCE_ID
:Spanner 实例的 IDSPANNER_DATABASE_NAME
:Spanner 数据库的名称
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Avro Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro \ --region REGION_NAME \ --staging-location GCS_STAGING_LOCATION \ --parameters \ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ outputDir=GCS_DIRECTORY
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称要使作业显示在 Google Cloud 控制台的 Cloud Spanner 部分中,作业名称必须与
cloud-spanner-export-INSTANCE_ID-DATABASE_ID
格式匹配。VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
GCS_STAGING_LOCATION
:写入临时文件的位置;例如gs://mybucket/temp
INSTANCE_ID
:您的 Cloud Spanner 实例 IDDATABASE_ID
:您的 Cloud Spanner 数据库 IDGCS_DIRECTORY
:Avro 文件导出到
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro { "jobName": "JOB_NAME", "parameters": { "instanceId": "INSTANCE_ID", "databaseId": "DATABASE_ID", "outputDir": "gs://GCS_DIRECTORY" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称要使作业显示在 Google Cloud 控制台的 Cloud Spanner 部分中,作业名称必须与
cloud-spanner-export-INSTANCE_ID-DATABASE_ID
格式匹配。VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
GCS_STAGING_LOCATION
:写入临时文件的位置;例如gs://mybucket/temp
INSTANCE_ID
:您的 Cloud Spanner 实例 IDDATABASE_ID
:您的 Cloud Spanner 数据库 IDGCS_DIRECTORY
:Avro 文件导出到
Cloud Spanner to Cloud Storage Text
Cloud Spanner to Cloud Storage Text 模板是一种批处理流水线,可从 Cloud Spanner 表中读取数据,然后将其作为 CSV 文本文件写入 Cloud Storage。
对此流水线的要求:
- 在运行此流水线之前,输入 Spanner 表必须已存在。
模板参数
参数 | 说明 |
---|---|
spannerProjectId |
需要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。 |
spannerDatabaseId |
所请求表的数据库 ID。 |
spannerInstanceId |
所请求的表的实例 ID。 |
spannerTable |
用于读取数据的表。 |
textWritePrefix |
写入输出文本文件的目录。在末尾添加“/”。例如:gs://mybucket/somefolder/ 。 |
spannerSnapshotTime |
(可选)与您要读取的 Cloud Spanner 数据库版本对应的时间戳。时间戳必须按照 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式指定。例如 1990-12-31T23:59:60Z 。时间戳必须是过去的时间,并且必须遵循时间戳过时上限。 |
运行 Cloud Spanner to Cloud Storage Text 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Text Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Spanner_to_GCS_Text \ --region REGION_NAME \ --parameters \ spannerProjectId=SPANNER_PROJECT_ID,\ spannerDatabaseId=DATABASE_ID,\ spannerInstanceId=INSTANCE_ID,\ spannerTable=TABLE_ID,\ textWritePrefix=gs://BUCKET_NAME/output/
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
SPANNER_PROJECT_ID
:要从中读取数据的 Spanner 数据库的 Cloud 项目 IDDATABASE_ID
:Spanner 数据库 IDBUCKET_NAME
- Cloud Storage 存储桶的名称。INSTANCE_ID
:Spanner 实例 IDTABLE_ID
:Spanner 表 ID
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Spanner_to_GCS_Text { "jobName": "JOB_NAME", "parameters": { "spannerProjectId": "SPANNER_PROJECT_ID", "spannerDatabaseId": "DATABASE_ID", "spannerInstanceId": "INSTANCE_ID", "spannerTable": "TABLE_ID", "textWritePrefix": "gs://BUCKET_NAME/output/" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
SPANNER_PROJECT_ID
:要从中读取数据的 Spanner 数据库的 Cloud 项目 IDDATABASE_ID
:Spanner 数据库 IDBUCKET_NAME
- Cloud Storage 存储桶的名称。INSTANCE_ID
:Spanner 实例 IDTABLE_ID
:Spanner 表 ID
Cloud Storage Avro to Bigtable
Cloud Storage Avro to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 Avro 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。
对此流水线的要求:
- Bigtable 表必须已存在,并且列族必须与 Avro 文件中导出的列族相同。
- 在运行此流水线之前,输入 Avro 文件必须已存在于 Cloud Storage 存储桶中。
- Bigtable 需要输入 Avro 文件具有特定架构。
模板参数
参数 | 说明 |
---|---|
bigtableProjectId |
您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导入的 Bigtable 表的 ID。 |
inputFilePattern |
数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix* 。 |
运行 Cloud Storage Avro file to Bigtable 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Bigtable template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable \ --region REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ inputFilePattern=INPUT_FILE_PATTERN
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。INPUT_FILE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "inputFilePattern": "INPUT_FILE_PATTERN", }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。INPUT_FILE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
Cloud Storage Avro to Cloud Spanner
Cloud Storage Avro files to Cloud Spanner 模板是一种批处理流水线,会读取从 Cloud Storage 中存储的 Cloud Spanner 导出的 Avro 文件,并将其导入 Cloud Spanner 数据库。
对此流水线的要求:
- 目标 Cloud Spanner 数据库必须已存在且必须为空。
- 您必须拥有 Cloud Storage 存储桶的读取权限以及目标 Cloud Spanner 数据库的写入权限。
- Cloud Storage 输入路径必须存在,并且必须包含
spanner-export.json
文件,且该文件包含要导入的文件的 JSON 描述。
模板参数
参数 | 说明 |
---|---|
instanceId |
Cloud Spanner 数据库的实例 ID。 |
databaseId |
Cloud Spanner 数据库的 ID。 |
inputDir |
导入 Avro 文件的 Cloud Storage 路径。 |
运行 Cloud Storage Avro to Cloud Spanner 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
作业名称必须与以下格式匹配,作业才会显示在 Google Cloud 控制台的 Spanner 实例页面中:
cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME
替换以下内容:
SPANNER_INSTANCE_ID
:Spanner 实例的 IDSPANNER_DATABASE_NAME
:Spanner 数据库的名称
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Spanner template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner \ --region REGION_NAME \ --staging-location GCS_STAGING_LOCATION \ --parameters \ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ inputDir=GCS_DIRECTORY
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INSTANCE_ID
:包含数据库的 Spanner 实例的 IDDATABASE_ID
:需要导入到的 Spanner 数据库的 IDGCS_DIRECTORY
:导入 Avro 文件的 Cloud Storage 路径,例如gs://mybucket/somefolder
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner { "jobName": "JOB_NAME", "parameters": { "instanceId": "INSTANCE_ID", "databaseId": "DATABASE_ID", "inputDir": "gs://GCS_DIRECTORY" }, "environment": { "machineType": "n1-standard-2" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INSTANCE_ID
:包含数据库的 Spanner 实例的 IDDATABASE_ID
:需要导入到的 Spanner 数据库的 IDGCS_DIRECTORY
:导入 Avro 文件的 Cloud Storage 路径,例如gs://mybucket/somefolder
Cloud Storage Parquet to Bigtable
Cloudtable Parquet to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 Parquet 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。
对此流水线的要求:
- Bigtable 表必须已存在,并且列族必须与 Parquet 文件中导出的列族相同。
- 在运行此流水线之前,输入 Parquet 文件必须已存在于 Cloud Storage 存储桶中。
- Bigtable 需要输入 Parquet 文件具有特定架构。
模板参数
参数 | 说明 |
---|---|
bigtableProjectId |
您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导入的 Bigtable 表的 ID。 |
inputFilePattern |
数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix* 。 |
运行 Cloud Storage Parquet file to Bigtable 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Parquet Files on Cloud Storage to Cloud Bigtable template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable \ --region REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ inputFilePattern=INPUT_FILE_PATTERN
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。INPUT_FILE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "inputFilePattern": "INPUT_FILE_PATTERN", }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。INPUT_FILE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
Cloud Storage SequenceFile to Bigtable
Cloud Storage SequenceFile to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 SequenceFile 读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。
对此流水线的要求:
- Bigtable 表必须已存在。
- 在运行此流水线之前,输入 SequenceFiles 文件必须已存在于 Cloud Storage 存储桶中。
- 输入 SequenceFile 必须已从 Bigtable 或 HBase 中导出。
模板参数
参数 | 说明 |
---|---|
bigtableProject |
您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。 |
bigtableInstanceId |
表所属的 Bigtable 实例的 ID。 |
bigtableTableId |
要导入的 Bigtable 表的 ID。 |
bigtableAppProfileId |
要用于导入的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件。 |
sourcePattern |
数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix* 。 |
运行 Cloud Storage SequenceFile to Bigtable 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the SequenceFile Files on Cloud Storage to Cloud Bigtable template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable \ --region REGION_NAME \ --parameters \ bigtableProject=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=INSTANCE_ID,\ bigtableTableId=TABLE_ID,\ bigtableAppProfileId=APPLICATION_PROFILE_ID,\ sourcePattern=SOURCE_PATTERN
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。APPLICATION_PROFILE_ID
:将用于导出的 Bigtable 应用配置文件的 ID。SOURCE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable { "jobName": "JOB_NAME", "parameters": { "bigtableProject": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "INSTANCE_ID", "bigtableTableId": "TABLE_ID", "bigtableAppProfileId": "APPLICATION_PROFILE_ID", "sourcePattern": "SOURCE_PATTERN", }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。INSTANCE_ID
:表所属的 Bigtable 实例的 ID。TABLE_ID
:需要导出的 Bigtable 表的 ID。APPLICATION_PROFILE_ID
:将用于导出的 Bigtable 应用配置文件的 ID。SOURCE_PATTERN
:数据所在的 Cloud Storage 路径模式,例如gs://mybucket/somefolder/prefix*
Cloud Storage Text to BigQuery
Cloud Storage Text to BigQuery 流水线是一种批处理流水线,可用于读取 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery 表。
对此流水线的要求:
- 创建一个用于描述 BigQuery 架构的 JSON 文件。
确保有一个名为
BigQuery Schema
的顶级 JSON 数组,且该数组的内容遵循{"name": "COLUMN_NAME", "type": "DATA_TYPE"}
格式。Cloud Storage Text to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的
STRUCT
(记录)字段。下面的 JSON 描述了一个 BigQuery 架构示例:
{ "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING" }, { "name": "coffee", "type": "STRING" } ] }
- 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (
.js
) 文件。您的函数必须返回一个 JSON 字符串。例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。
function transform(line) { var values = line.split(','); var obj = new Object(); obj.location = values[0]; obj.name = values[1]; obj.age = values[2]; obj.color = values[3]; obj.coffee = values[4]; var jsonString = JSON.stringify(obj); return jsonString; }
模板参数
参数 | 说明 |
---|---|
javascriptTextTransformFunctionName |
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
JSONPath |
用于定义 BigQuery 架构的 JSON 文件(存储在 Cloud Storage 中)的 gs:// 路径。例如 gs://path/to/my/schema.json 。 |
javascriptTextTransformGcsPath |
.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
inputFilePattern |
Cloud Storage 中待处理文本的 gs:// 路径。例如 gs://path/to/my/text/data.txt 。 |
outputTable |
要创建用以存储已处理数据的 BigQuery 表名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。例如 my-project-name:my-dataset.my-table 。 |
bigQueryLoadingTemporaryDirectory |
BigQuery 加载进程的临时目录。例如 gs://my-bucket/my-files/temp_dir 。 |
运行 Cloud Storage Text to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery (Batch) template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery \ --region REGION_NAME \ --parameters \ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ inputFilePattern=PATH_TO_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_BIGQUERY_SCHEMA_JSON
:包含架构定义的 JSON 文件的 Cloud Storage 路径PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
:文本数据集的 Cloud Storage 路径BIGQUERY_TABLE
:您的 BigQuery 表名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "inputFilePattern":"PATH_TO_TEXT_DATA", "outputTable":"BIGQUERY_TABLE", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_BIGQUERY_SCHEMA_JSON
:包含架构定义的 JSON 文件的 Cloud Storage 路径PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
PATH_TO_TEXT_DATA
:文本数据集的 Cloud Storage 路径BIGQUERY_TABLE
:您的 BigQuery 表名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径
Cloud Storage Text to Datastore [已弃用]
此模板已弃用,将于 2022 年第一季度移除。请迁移到 Cloud Storage Text to Firestore 模板。
Cloud Storage Text to Datastore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Datastore。输入文本文件中的所有行都必须采用 指定的 JSON 格式。
对此流水线的要求:
- 必须在目标项目中启用 Datastore。
模板参数
参数 | 说明 |
---|---|
textReadPattern |
指定文本数据文件位置的 Cloud Storage 路径模式。例如 gs://mybucket/somepath/*.json 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
datastoreWriteProjectId |
要向其写入 Datastore 实体的位置的 Google Cloud 项目 ID |
datastoreHintNumWorkers |
(可选)Datastore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500 。 |
errorWritePath |
错误日志输出文件,用于写入在处理期间发生的故障。例如 gs://bucket-name/errors.txt 。 |
运行 Cloud Storage Text to Datastore 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Datastore template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Datastore \ --region REGION_NAME \ --parameters \ textReadPattern=PATH_TO_INPUT_TEXT_FILES,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ datastoreWriteProjectId=PROJECT_ID,\ errorWritePath=ERROR_FILE_WRITE_PATH
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
PATH_TO_INPUT_TEXT_FILES
:Cloud Storage 上的输入文件模式JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
ERROR_FILE_WRITE_PATH
:Cloud Storage 上错误文件所需的路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Datastore { "jobName": "JOB_NAME", "parameters": { "textReadPattern": "PATH_TO_INPUT_TEXT_FILES", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "datastoreWriteProjectId": "PROJECT_ID", "errorWritePath": "ERROR_FILE_WRITE_PATH" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
PATH_TO_INPUT_TEXT_FILES
:Cloud Storage 上的输入文件模式JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
ERROR_FILE_WRITE_PATH
:Cloud Storage 上错误文件所需的路径
Cloud Storage Text to Firestore
Cloud Storage Text to Firestore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Firestore。输入文本文件中的所有行都必须采用 指定的 JSON 格式。
对此流水线的要求:
- 必须在目标项目中启用 Firestore。
模板参数
参数 | 说明 |
---|---|
textReadPattern |
指定文本数据文件位置的 Cloud Storage 路径模式。例如 gs://mybucket/somepath/*.json 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
firestoreWriteProjectId |
要向其写入 Firestore 实体的位置的 Google Cloud 项目 ID |
firestoreHintNumWorkers |
(可选)Firestore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500 。 |
errorWritePath |
错误日志输出文件,用于写入在处理期间发生的故障。例如 gs://bucket-name/errors.txt 。 |
运行 Cloud Storage Text to Firestore 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Firestore template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Firestore \ --region REGION_NAME \ --parameters \ textReadPattern=PATH_TO_INPUT_TEXT_FILES,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ firestoreWriteProjectId=PROJECT_ID,\ errorWritePath=ERROR_FILE_WRITE_PATH
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
PATH_TO_INPUT_TEXT_FILES
:Cloud Storage 上的输入文件模式JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
ERROR_FILE_WRITE_PATH
:Cloud Storage 上错误文件所需的路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Firestore { "jobName": "JOB_NAME", "parameters": { "textReadPattern": "PATH_TO_INPUT_TEXT_FILES", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "firestoreWriteProjectId": "PROJECT_ID", "errorWritePath": "ERROR_FILE_WRITE_PATH" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
PATH_TO_INPUT_TEXT_FILES
:Cloud Storage 上的输入文件模式JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
ERROR_FILE_WRITE_PATH
:Cloud Storage 上错误文件所需的路径
Cloud Storage Text to Pub/Sub (Batch)
此模板会创建一种批处理流水线,该流水线可从存储在 Cloud Storage 中的文本文件读取记录,并将其发布到 Pub/Sub 主题。使用此模板,您可以将采用换行符分隔的文件中的 JSON 记录或 CSV 文件中的记录发布到 Pub/Sub 主题,以实现实时处理。您可以使用此模板将数据重放到 Pub/Sub。
此模板不会在各个记录上设置任何时间戳。事件时间等于执行期间的发布时间。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。
对此流水线的要求:
- 需要读取的文件必须采用换行符分隔 JSON 或 CSV 格式。在源文件中占多行的记录可能会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
- 在运行此流水线之前,Pub/Sub 主题必须已存在。
模板参数
参数 | 说明 |
---|---|
inputFilePattern |
要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json 。 |
outputTopic |
要向其写入数据的 Pub/Sub 输入主题。名称必须采用 projects/<project-id>/topics/<topic-name> 格式。 |
运行 Cloud Storage Text to Pub/Sub (Batch) 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Batch) template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub \ --region REGION_NAME \ --parameters \ inputFilePattern=gs://BUCKET_NAME/files/*.json,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
TOPIC_NAME
:您的 Pub/Sub 主题名称BUCKET_NAME
:Cloud Storage 存储桶的名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "gs://BUCKET_NAME/files/*.json", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
TOPIC_NAME
:您的 Pub/Sub 主题名称BUCKET_NAME
:Cloud Storage 存储桶的名称
Cloud Storage Text to Cloud Spanner
Cloud Storage Text to Cloud Spanner 模板是一种批处理流水线,用于从 Cloud Storage 读取 CSV 文本文件并将其导入到 Cloud Spanner 数据库。
对此流水线的要求:
- 目标 Cloud Spanner 数据库和表必须已存在。
- 您必须拥有 Cloud Storage 存储桶的读取权限以及目标 Cloud Spanner 数据库的写入权限。
- 包含 CSV 文件的 Cloud Storage 输入路径必须存在。
- 您必须创建包含 CSV 文件的 JSON 说明的导入清单文件,并且必须将该清单存储在 Cloud Storage 中。
- 如果目标 Cloud Spanner 数据库已有架构,则清单文件中指定的任何列都必须与目标数据库架构中的相应列具有相同的数据类型。
-
采用 ASCII 或 UTF-8 编码的清单文件必须符合以下格式:
- 要导入的文本文件必须采用 ASCII 或 UTF-8 编码的 CSV 格式。我们建议您不要在 UTF-8 编码文件中使用字节顺序标记 (BOM)。
- 数据必须与下面的一种类型相匹配:
GoogleSQL
BOOL INT64 FLOAT64 NUMERIC STRING DATE TIMESTAMP BYPES JSON
PostgreSQL
boolean bigint double precision numeric character varying, text date timestamp with time zone bytea
模板参数
参数 | 说明 |
---|---|
instanceId |
Cloud Spanner 数据库的实例 ID。 |
databaseId |
Cloud Spanner 数据库的 ID。 |
importManifest |
Cloud Storage 中导入清单文件的路径。 |
columnDelimiter |
源文件使用的列分隔符。默认值为 , 。 |
fieldQualifier |
应放置在包含 columnDelimiter 的源文件中任何值前后的字符。默认值为 " 。 |
trailingDelimiter |
指定源文件中的行是否带有末尾分隔符(即,在每行末尾的最后一列值之后,是否会出现 columnDelimiter 字符)。默认值为 true 。 |
escape |
源文件使用的转义字符。默认情况下,此参数未设置,并且模板不使用转义字符。 |
nullString |
表示 NULL 值的字符串。默认情况下,此参数未设置,并且模板不使用 null 字符串。 |
dateFormat |
用于解析日期列的格式。默认情况下,流水线会尝试将日期列解析为 yyyy-M-d[' 00:00:00'] ,例如,解析为 2019-01-31 或 2019-1-1 00:00:00。如果您的日期格式有所不同,请使用 java.time.format.DateTimeFormatter 模式指定格式。 |
timestampFormat |
用于解析时间戳列的格式。如果时间戳为长整数,则会解析为 Unix 时间。否则,时间戳会解析为 java.time.format.DateTimeFormatter.ISO_INSTANT 格式的字符串。对于其他情况,请指定您自己的模式字符串,例如,您可以对 "Jan 21 1998 01:02:03.456+08:00" 格式的时间戳使用 MMM dd yyyy HH:mm:ss.SSSVV 。 |
如果您需要使用自定义日期或时间戳格式,请确保这些格式是有效的 java.time.format.DateTimeFormatter
模式。下表显示了日期和时间戳列的自定义格式的其他示例:
类型 | 输入值 | 格式 | 备注 |
---|---|---|---|
DATE |
2011-3-31 | 默认情况下,模板可以解析此格式。您无需指定 dateFormat 参数。 |
|
DATE |
2011-3-31 00:00:00 | 默认情况下,模板可以解析此格式。您无需指定格式。如果需要,您可以使用 yyyy-M-d' 00:00:00' 。 |
|
DATE |
01 Apr, 18 | dd MMM, yy | |
DATE |
Wednesday, April 3, 2019 AD | EEEE, LLLL d, yyyy G | |
TIMESTAMP |
2019-01-02T11:22:33Z 2019-01-02T11:22:33.123Z 2019-01-02T11:22:33.12356789Z |
默认格式 ISO_INSTANT 可以解析此类型的时间戳。您无需提供 timestampFormat 参数。 |
|
TIMESTAMP |
1568402363 | 默认情况下,模板可以解析此类型的时间戳并将其视为 Unix 时间。 | |
TIMESTAMP |
Tue, 3 Jun 2008 11:05:30 GMT | EEE, d MMM yyyy HH:mm:ss VV | |
TIMESTAMP |
2018/12/31 110530.123PST | yyyy/MM/dd HHmmss.SSSz | |
TIMESTAMP |
2019-01-02T11:22:33Z 或 2019-01-02T11:22:33.123Z | yyyy-MM-dd'T'HH:mm:ss[.SSS]VV | 如果输入列为 2019-01-02T11:22:33Z 和 2019-01-02T11:22:33.123Z 的混合格式,则默认格式可以解析此类型的时间戳。您无需提供自己的格式参数。您可以使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 来处理这两种情况。您不能使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]'Z' ,因为后缀“Z”必须解析为时区 ID(而不是字符字面量)。在内部,时间戳列会转换为 java.time.Instant 。因此,您必须以 UTC 指定时间戳列,或者将时区信息与其相关联。本地日期时间(例如 2019-01-02 11:22:33)无法解析为有效的 java.time.Instant 。 |
在 Cloud Storage to Cloud Spanner 模板上运行文本文件
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Cloud Spanner template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner \ --region REGION_NAME \ --parameters \ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ importManifest=GCS_PATH_TO_IMPORT_MANIFEST
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INSTANCE_ID
:您的 Cloud Spanner 实例 IDDATABASE_ID
:您的 Cloud Spanner 数据库 IDGCS_PATH_TO_IMPORT_MANIFEST
:导入清单文件的 Cloud Storage 路径
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner { "jobName": "JOB_NAME", "parameters": { "instanceId": "INSTANCE_ID", "databaseId": "DATABASE_ID", "importManifest": "GCS_PATH_TO_IMPORT_MANIFEST" }, "environment": { "machineType": "n1-standard-2" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INSTANCE_ID
:您的 Cloud Spanner 实例 IDDATABASE_ID
:您的 Cloud Spanner 数据库 IDGCS_PATH_TO_IMPORT_MANIFEST
:导入清单文件的 Cloud Storage 路径
Cloud Storage to Elasticsearch
Cloud Storage to Elasticsearch 模板是一种批处理流水线,可从存储在 Cloud Storage 存储桶中的 csv 文件读取数据,并将数据作为 JSON 文档注入到 Elasticsearch 中。
对此流水线的要求:
- Cloud Storage 存储桶必须存在。
- Google Cloud 实例或 Elasticsearch Cloud 上必须存在可通过 Dataflow 访问的 Elasticsearch 主机。
- 错误输出的 BigQuery 表必须存在。
模板参数
参数 | 说明 |
---|---|
inputFileSpec |
用于搜索 CSV 文件的 Cloud Storage 文件格式。示例:gs://mybucket/test-*.csv 。 |
connectionUrl |
Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。 |
apiKey |
用于身份验证的 Base64 编码 API 密钥。 |
index |
将对其发出请求的 Elasticsearch 索引,例如 my-index 。 |
deadletterTable |
将失败的插入发送到的 BigQuery Deadletter 表。示例:<your-project>:<your-dataset>.<your-table-name> 。 |
containsHeaders |
(可选)用于指明 CSV 中是否包含标题的布尔值。默认值:true 。 |
delimiter |
(可选)CSV 使用的分隔符。示例:, |
csvFormat |
(可选)基于 Apache Commons CSV 格式的 CSV 格式。默认值:Default 。 |
jsonSchemaPath |
(可选)JSON 架构的路径。默认值:null 。 |
largeNumFiles |
(可选)如果文件数达到数万个,则设置为 true。默认值:false 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
batchSize |
(可选)文档数量中的批次大小。默认值:1000 。 |
batchSizeBytes |
(可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。 |
maxRetryAttempts |
(可选)尝试次数上限,必须大于 0。默认值:不重试。 |
maxRetryDuration |
(可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:不重试。 |
csvFileEncoding |
(可选)CSV 文件编码。 |
propertyAsIndex |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。 |
propertyAsId |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。 |
javaScriptIndexFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIndexFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIdFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptIdFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptTypeFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptTypeFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptIsDeleteFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
javaScriptIsDeleteFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
usePartialUpdate |
(可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false 。 |
bulkInsertMethod |
(可选)在 Elasticsearch 批量请求中使用 INDEX (索引,允许执行更新插入操作)还是 CREATE (创建,会对重复 _id 报错)。默认值:CREATE 。 |
运行 Cloud Storage to Elasticsearch 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Storage to Elasticsearch template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INPUT_FILE_SPEC
:您的 Cloud Storage 文件格式。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。DEADLETTER_TABLE
:您的 BigQuery 表。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
INPUT_FILE_SPEC
:您的 Cloud Storage 文件格式。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。DEADLETTER_TABLE
:您的 BigQuery 表。
Java Database Connectivity (JDBC) to BigQuery
JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系数据库。您可以使用此模板将数据从任何具有可用 JDBC 驱动程序的关系数据库复制到 BigQuery 中。为了增加一项保护措施,您还可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点。
对此流水线的要求:
- 关系数据库的 JDBC 驱动程序必须可用。
- 在运行此流水线之前,BigQuery 表必须已存在。
- BigQuery 表必须具有兼容的架构。
- 必须能够从 Dataflow 运行的子网访问关系数据库。
模板参数
参数 | 说明 |
---|---|
driverJars |
以逗号分隔的驱动程序 JAR 文件列表。例如 gs://<my-bucket>/driver_jar1.jar,gs://<my-bucket>/driver_jar2.jar 。 |
driverClassName |
JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver 。 |
connectionURL |
JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb 。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。 |
query |
要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table 。 |
outputTable |
BigQuery 输出表位置,采用 <my-project>:<my-dataset>.<my-table> 格式。 |
bigQueryLoadingTemporaryDirectory |
BigQuery 加载进程的临时目录。例如 gs://<my-bucket>/my-files/temp_dir 。 |
connectionProperties |
(可选)用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]* 。例如 unicode=true;characterEncoding=UTF-8 。 |
username |
(可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。 |
password |
(可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。 |
KMSEncryptionKey |
(可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。 |
disabledAlgorithms |
(可选)要停用的以英文逗号分隔的算法。如果此值设置为 none ,则不会停用任何算法。请谨慎使用,因为默认停用的算法已知存在漏洞或性能问题。例如 SSLv3, RC4. |
extraFilesToStage |
用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> 。 |
运行 JDBC to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the JDBC to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Jdbc_to_BigQuery \ --region REGION_NAME \ --parameters \ driverJars=DRIVER_PATHS,\ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ query=SOURCE_SQL_QUERY,\ outputTable=PROJECT_ID:DATASET.TABLE_NAME, bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\ connectionProperties=CONNECTION_PROPERTIES,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径DRIVER_CLASS_NAME
:驱动器类名称JDBC_CONNECTION_URL
:JDBC 连接网址SOURCE_SQL_QUERY
:需要在源数据库上运行的 SQL 查询DATASET
:您的 BigQuery 数据集,并替换TABLE_NAME
:您的 BigQuery 表名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码KMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Jdbc_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "driverJars": "DRIVER_PATHS", "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "query": "SOURCE_SQL_QUERY", "outputTable": "PROJECT_ID:DATASET.TABLE_NAME", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS", "connectionProperties": "CONNECTION_PROPERTIES", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径DRIVER_CLASS_NAME
:驱动器类名称JDBC_CONNECTION_URL
:JDBC 连接网址SOURCE_SQL_QUERY
:需要在源数据库上运行的 SQL 查询DATASET
:您的 BigQuery 数据集,并替换TABLE_NAME
:您的 BigQuery 表名称PATH_TO_TEMP_DIR_ON_GCS
:临时目录的 Cloud Storage 路径CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码KMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
Java Database Connectivity (JDBC) to Pub/Sub
Java Database Connectivity (JDBC) to Pub/Sub 模板是一个批处理流水线,可从 JDBC 源注入数据,并将生成的记录作为 JSON 字符串写入预先存在的 Pub/Sub 主题。
对此流水线的要求:
- 在运行流水线之前,JDBC 源必须已存在。
- 在运行流水线之前,Cloud Pub/Sub 输出主题必须已存在。
模板参数
参数 | 说明 |
---|---|
driverClassName |
JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver 。 |
connectionUrl |
JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb 。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。 |
driverJars |
以英文逗号分隔的 JDBC 驱动程序 Cloud Storage 路径。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar 。 |
username |
(可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。 |
password |
(可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。 |
connectionProperties |
(可选)用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]* 。例如 unicode=true;characterEncoding=UTF-8 。 |
query |
要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table 。 |
outputTopic |
要发布到的 Pub/Sub 主题,格式为 projects/<project>/topics/<topic> 。 |
KMSEncryptionKey |
(可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。 |
extraFilesToStage |
用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> 。 |
运行 Java Database Connectivity (JDBC) to Pub/Sub 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the JDBC to Pub/Sub template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/flex/Jdbc_to_PubSub \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ query=SOURCE_SQL_QUERY,\ outputTopic=OUTPUT_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
DRIVER_CLASS_NAME
:驱动程序类名称JDBC_CONNECTION_URL
:JDBC 连接网址DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)SOURCE_SQL_QUERY
:需要在源数据库上运行的 SQL 查询OUTPUT_TOPIC
:要发布到的 Pub/SubKMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "query": "SOURCE_SQL_QUERY", "outputTopic": "OUTPUT_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
DRIVER_CLASS_NAME
:驱动程序类名称JDBC_CONNECTION_URL
:JDBC 连接网址DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)SOURCE_SQL_QUERY
:需要在源数据库上运行的 SQL 查询OUTPUT_TOPIC
:要发布到的 Pub/SubKMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
Apache Cassandra to Cloud Bigtable
“从 Apache Cassandra 到 Cloud Bigtable”模板会将一个表从 Apache Cassandra 复制到 Cloud Bigtable。此模板需要最低限度的配置,可在 Cloud Bigtable 中尽可能准确地复制 Cassandra 中的表结构。
“从 Apache Cassandra 到 Cloud Bigtable”模板适用于以下情况:
- 在可以接受短时间停机的情况下,迁移 Apache Cassandra 数据库。
- 定期将 Cassandra 表复制到 Cloud Bigtable 以便向全球用户传送数据。
对此流水线的要求:
- 在运行此流水线之前,目标 Bigtable 表必须已存在。
- Dataflow 工作器与 Apache Cassandra 节点之间建立了网络连接。
类型转换
“从 Apache Cassandra 到 Cloud Bigtable”模板会自动将 Apache Cassandra 数据类型转换为 Cloud Bigtable 的数据类型。
大多数原语在 Cloud Bigtable 和 Apache Cassandra 中的表示方式相同;但以下原语的表示方式有所不同。
Date
和Timestamp
会转化为DateTime
对象UUID
被转换为String
Varint
被转换为BigDecimal
Apache Cassandra 还原生支持更复杂的类型,例如 Tuple
、List
、Set
和 Map
。此流水线不支持元组,因为 Apache Beam 中没有相应的类型。
例如,在 Apache Cassandra 中,您有一个 List
类型的列,名为“mylist”,还有一些类似下表内容的值。
row | mylist |
---|---|
1 | (a,b,c) |
流水线会直接将此 List 列扩展为三个不同的列(在 Cloud Bigtable 中称为列限定符)。列名称为“mylist”,但流水线还会附加上列表中各项的索引编号,例如“mylist[0]”。
row | mylist[0] | mylist[1] | mylist[2] |
---|---|---|---|
1 | a | b | c |
流水线按照处理列表的方式来处理集合,但添加了后缀来指明单元是键还是值。
row | mymap |
---|---|
1 | {"first_key":"first_value","another_key":"different_value"} |
转换后,表如下所示:
row | mymap[0].key | mymap[0].value | mymap[1].key | mymap[1].value |
---|---|---|---|---|
1 | first_key | first_value | another_key | different_value |
主键转换
在 Apache Cassandra 中,主键是使用数据定义语言定义的。主键可以是简单的、复合的,或者是与聚簇列复合的。Cloud Bigtable 支持手动行键构造,按字典顺序对字节数组进行排序。此流水线会收集有关键类型的信息,并遵循基于多个值构建行键的最佳做法构建键。
模板参数
参数 | 说明 |
---|---|
cassandraHosts |
以英文逗号分隔的列表中的 Apache Cassandra 节点的主机。 |
cassandraPort |
(可选)节点上用于访问 Apache Cassandra 的 TCP 端口(默认为 9042 )。 |
cassandraKeyspace |
表格所在的 Apache Cassandra 键空间。 |
cassandraTable |
要复制的 Apache Cassandra 表格。 |
bigtableProjectId |
从中复制 Apache Cassandra 表的 Bigtable 实例的 Google Cloud 项目 ID。 |
bigtableInstanceId |
要复制 Apache Cassandra 表格的 Bigtable 实例 ID。 |
bigtableTableId |
要复制 Apache Cassandra 表格的 Bigtable 表格的名称。 |
defaultColumnFamily |
(可选)Bigtable 表格的列族名称(默认为 default )。 |
rowKeySeparator |
(可选)用于构建行键的分隔符(默认为 # )。 |
运行 Apache Cassandra to Cloud Bigtable 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cassandra to Cloud Bigtable template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable \ --region REGION_NAME \ --parameters \ bigtableProjectId=BIGTABLE_PROJECT_ID,\ bigtableInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableTableId=BIGTABLE_TABLE_ID,\ cassandraHosts=CASSANDRA_HOSTS,\ cassandraKeyspace=CASSANDRA_KEYSPACE,\ cassandraTable=CASSANDRA_TABLE
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:Cloud Bigtable 所在的项目 IDBIGTABLE_INSTANCE_ID
:Cloud Bigtable 实例 IDBIGTABLE_TABLE_ID
:您的 Cloud Bigtable 表名称CASSANDRA_HOSTS
:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号CASSANDRA_KEYSPACE
:表格所在的 Apache Cassandra 键空间CASSANDRA_TABLE
:需要迁移的 Apache Cassandra 表
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable { "jobName": "JOB_NAME", "parameters": { "bigtableProjectId": "BIGTABLE_PROJECT_ID", "bigtableInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableTableId": "BIGTABLE_TABLE_ID", "cassandraHosts": "CASSANDRA_HOSTS", "cassandraKeyspace": "CASSANDRA_KEYSPACE", "cassandraTable": "CASSANDRA_TABLE" }, "environment": { "zone": "us-central1-f" } }
替换以下内容:
PROJET_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
BIGTABLE_PROJECT_ID
:Cloud Bigtable 所在的项目 IDBIGTABLE_INSTANCE_ID
:Cloud Bigtable 实例 IDBIGTABLE_TABLE_ID
:您的 Cloud Bigtable 表名称CASSANDRA_HOSTS
:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号CASSANDRA_KEYSPACE
:表格所在的 Apache Cassandra 键空间CASSANDRA_TABLE
:需要迁移的 Apache Cassandra 表
MongoDB to BigQuery
MongoDB to BigQuery 模板是一种批处理流水线,可从 MongoDB 读取文档并按照 userOption
参数指定的选项将其写入 BigQuery。
对此流水线的要求
- 目标 BigQuery 数据集必须已存在。
- 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。
模板参数
参数 | 说明 |
---|---|
mongoDbUri |
MongoDB 连接 URI,格式为 mongodb+srv://:@ 。 |
database |
从中读取集合的 MongoDB 数据库。例如:my-db 。 |
collection |
MongoDB 数据库中集合的名称。例如:my-collection 。 |
outputTableSpec |
要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table 。 |
userOption |
FLATTEN 或 NONE 。FLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。 |
运行 MongoDB to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
OUTPUT_TABLE_SPEC
:您的 BigQuery 目标表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN 或 NONE。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
OUTPUT_TABLE_SPEC
:您的 BigQuery 目标表的名称。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 数据库。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN 或 NONE。