Google 提供的 Dataflow 批处理模板

Google 提供了一组开源 Dataflow 模板。

这些 Dataflow 模板可帮助您处理大型数据任务,包括数据导入、数据导出、数据备份、数据恢复和批量 API 操作,所有这些均无需使用专用开发环境。这些模板基于 Apache Beam 构建,并使用 Dataflow 转换数据。

如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板

本指南介绍了批量模板。

BigQuery to Cloud Storage TFRecords

BigQuery to Cloud Storage TFRecords 模板是一种流水线,可从 BigQuery 查询读取数据并以 TFRecord 格式将其写入 Cloud Storage 存储桶。您可以指定训练、测试和验证拆分百分比。默认情况下,训练集的拆分比例为 1 或 100%,测试和验证集的拆分比例为 0 或 0%。设置数据集拆分比例时,训练、测试和验证之和加起来必须为 1 或 100%(例如,0.6 + 0.2 + 0.2)。Dataflow 会自动确定每个输出数据集的最佳分片数。

对此流水线的要求

  • BigQuery 数据集和表必须已存在。
  • 输出 Cloud Storage 存储桶必须存在才能执行此流水线。训练、测试和验证子目录不需要预先存在,将会自动生成。

模板参数

参数 说明
readQuery 用于从来源中提取数据的 BigQuery SQL 查询。例如 select * from dataset1.sample_table
outputDirectory 在其中写入训练、测试和验证 TFRecord 文件的顶级 Cloud Storage 路径前缀。例如 gs://mybucket/output。生成的训练、测试和验证 TFRecord 文件的子目录根据 outputDirectory 自动生成。例如 gs://mybucket/output/train
trainingPercentage (可选)分配给训练 TFRecord 文件的查询数据所占的百分比。默认值为 1 或 100%。
testingPercentage (可选)分配给测试 TFRecord 文件的查询数据所占的百分比。 默认值为 0 或 0%。
validationPercentage (可选)分配给验证 TFRecord 文件的查询数据所占的百分比。 默认值为 0 或 0%。
outputSuffix (可选)写入的训练、测试和验证 TFRecord 文件的文件后缀。默认值为 .tfrecord

运行 BigQuery to Cloud Storage TFRecord files 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to TFRecords template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records \
    --region REGION_NAME \
    --parameters \
readQuery=READ_QUERY,\
outputDirectory=OUTPUT_DIRECTORY,\
trainingPercentage=TRAINING_PERCENTAGE,\
testingPercentage=TESTING_PERCENTAGE,\
validationPercentage=VALIDATION_PERCENTAGE,\
outputSuffix=OUTPUT_FILENAME_SUFFIX

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • READ_QUERY:要运行的 BigQuery 查询
  • OUTPUT_DIRECTORY:输出数据集的 Cloud Storage 路径前缀
  • TRAINING_PERCENTAGE:训练数据集的拆分小数百分比
  • TESTING_PERCENTAGE:测试数据集的拆分小数百分比
  • VALIDATION_PERCENTAGE:验证数据集的拆分小数百分比
  • OUTPUT_FILENAME_SUFFIX:首选输出 TensorFlow 记录文件后缀

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records
{
   "jobName": "JOB_NAME",
   "parameters": {
       "readQuery":"READ_QUERY",
       "outputDirectory":"OUTPUT_DIRECTORY",
       "trainingPercentage":"TRAINING_PERCENTAGE",
       "testingPercentage":"TESTING_PERCENTAGE",
       "validationPercentage":"VALIDATION_PERCENTAGE",
       "outputSuffix":"OUTPUT_FILENAME_SUFFIX"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • READ_QUERY:要运行的 BigQuery 查询
  • OUTPUT_DIRECTORY:输出数据集的 Cloud Storage 路径前缀
  • TRAINING_PERCENTAGE:训练数据集的拆分小数百分比
  • TESTING_PERCENTAGE:测试数据集的拆分小数百分比
  • VALIDATION_PERCENTAGE:验证数据集的拆分小数百分比
  • OUTPUT_FILENAME_SUFFIX:首选输出 TensorFlow 记录文件后缀

BigQuery export to Parquet(通过 Storage API)

BigQuery export to Parquet 模板是一种批处理流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储桶。此模板利用 BigQuery Storage API 导出数据。

对此流水线的要求

  • 在运行此流水线之前,输入 BigQuery 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。

模板参数

参数 说明
tableRef BigQuery 输入表位置。例如 <my-project>:<my-dataset>.<my-table>
bucket 要在其中写入 Parquet 文件的 Cloud Storage 文件夹。例如 gs://mybucket/exports
numShards (可选)输出文件分片数。默认值为 1。
fields (可选)要从输入 BigQuery 表格选择的以英文逗号分隔的字段列表。

运行 BigQuery to Cloud Storage Parquet 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery export to Parquet (via Storage API) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet \
    --region=REGION_NAME \
    --parameters \
tableRef=BIGQUERY_TABLE,\
bucket=OUTPUT_DIRECTORY,\
numShards=NUM_SHARDS,\
fields=FIELDS

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • OUTPUT_DIRECTORY:输出文件的 Cloud Storage 文件夹
  • NUM_SHARDS:所需的输出文件分片数
  • FIELDS:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "tableRef": "BIGQUERY_TABLE",
          "bucket": "OUTPUT_DIRECTORY",
          "numShards": "NUM_SHARDS",
          "fields": "FIELDS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • OUTPUT_DIRECTORY:输出文件的 Cloud Storage 文件夹
  • NUM_SHARDS:所需的输出文件分片数
  • FIELDS:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表

BigQuery to Elasticsearch

BigQuery to Elasticsearch 模板是一种批处理流水线,用于将 BigQuery 表中的数据作为文档注入到 Elasticsearch 中。该模板可以读取整个表,也可以使用提供的查询读取特定记录。

对此流水线的要求

  • 源 BigQuery 表必须存在。
  • 必须具有在 Google Cloud 实例上或 Elastic Cloud 上运行的使用 Elasticsearch 7.0 版或更高版本的 Elasticsearch 主机,并且该主机应能够从 Dataflow 工作器机器进行访问。

模板参数

参数 说明
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
index 将向其发出请求的 Elasticsearch 索引,例如 my-index
inputTableSpec (可选)要读取并插入到 Elasticsearch 中的 BigQuery 表。必须提供表或查询。例如 projectId:datasetId.tablename
query (可选)用于从 BigQuery 拉取数据的 SQL 查询。必须提供表或查询。
useLegacySql (可选)设置为 true 即可使用旧版 SQL(仅在提供查询时适用)。默认值:false
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:no retries
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries
propertyAsIndex (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。
propertyAsId (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。
javaScriptIndexFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIndexFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIdFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptIdFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptTypeFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptTypeFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptIsDeleteFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
javaScriptIsDeleteFnName (可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
usePartialUpdate (可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false
bulkInsertMethod (可选)在 Elasticsearch 批量请求中使用 INDEX(索引,允许执行更新插入操作)还是 CREATE(创建,会对重复 _id 报错)。默认值:CREATE

运行 BigQuery to Elasticsearch 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。

BigQuery to MongoDB

BigQuery to MongoDB 模板是一种批处理流水线,可从 BigQuery 读取行数据并将其作为文档写入 MongoDB。目前,每一行数据都被存储为一个文档。

对此流水线的要求

  • 源 BigQuery 表必须存在。
  • 应该能够从 Dataflow 工作器机器访问目标 MongoDB 实例。

模板参数

参数 说明
mongoDbUri MongoDB 连接 URI,格式为 mongodb+srv://:@
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
inputTableSpec 要读取的 BigQuery 表。例如 bigquery-project:dataset.input_table

运行 BigQuery to MongoDB 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

  gcloud beta dataflow flex-template run JOB_NAME \
      --project=PROJECT_ID \
      --region=REGION_NAME \
      --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB \
      --parameters \
  inputTableSpec=INPUT_TABLE_SPEC,\
  mongoDbUri=MONGO_DB_URI,\
  database=DATABASE,\
  collection=COLLECTION
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_TABLE_SPEC:您的 BigQuery 源表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
     "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputTableSpec": "INPUT_TABLE_SPEC",
            "mongoDbUri": "MONGO_DB_URI",
            "database": "DATABASE",
            "collection": "COLLECTION"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/BigQuery_to_MongoDB",
     }
  }

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_TABLE_SPEC:您的 BigQuery 源表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。

Bigtable to Cloud Storage Avro

Bigtable to Cloud Storage Avro 模板是一种流水线,可从 Bigtable 表中读取数据并以 Avro 格式将其写入 Cloud Storage 存取分区。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
outputDirectory 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Avro 文件名的前缀。例如 output-

运行 Bigtable to Cloud Storage Avro file 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Avro Files on Cloud Storage template 。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Avro 文件名的前缀,例如 output-

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Avro 文件名的前缀,例如 output-

Bigtable to Cloud Storage Parquet

Bigtable to Cloud Storage Parquet 模板是一种流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储桶。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
outputDirectory 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Parquet 文件名的前缀。例如 output-
numShards 输出文件分片数。例如 2

运行 Bigtable to Cloud Storage Parquet file 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Parquet Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX,\
numShards=NUM_SHARDS

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Parquet 文件名的前缀,例如 output-
  • NUM_SHARDS:要输出的 Parquet 文件的数量,例如 1

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
       "numShards": "NUM_SHARDS"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Parquet 文件名的前缀,例如 output-
  • NUM_SHARDS:要输出的 Parquet 文件的数量,例如 1

Bigtable to Cloud Storage SequenceFile

Bigtable to Cloud Storage SequenceFile 模板是一种流水线,可从 Bigtable 表读取数据并以 SequenceFile 格式将其写入 Cloud Storage 存储桶。您可以使用该模板将数据从 Bigtable 复制到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储桶必须已存在。

模板参数

参数 说明
bigtableProject 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
bigtableAppProfileId 要用于导出的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件
destinationPath 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix SequenceFile 文件名的前缀。例如 output-

运行 Bigtable to Cloud Storage SequenceFile 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to SequenceFile Files on Cloud Storage template 。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile \
    --region REGION_NAME \
    --parameters \
bigtableProject=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
bigtableAppProfileId=APPLICATION_PROFILE_ID,\
destinationPath=DESTINATION_PATH,\
filenamePrefix=FILENAME_PREFIX

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • DESTINATION_PATH:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:SequenceFile 文件名的前缀,例如 output-

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "destinationPath": "DESTINATION_PATH",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • DESTINATION_PATH:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:SequenceFile 文件名的前缀,例如 output-

Datastore to Cloud Storage Text [已弃用]

此模板已弃用,将于 2022 年第一季度移除。请迁移到 Firestore to Cloud Storage Text 模板。

Datastore to Cloud Storage Text 模板是一种批处理流水线,可读取 Datastore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。

对此流水线的要求

在运行此流水线之前,必须先在项目中设置 Datastore。

模板参数

参数 说明
datastoreReadGqlQuery 一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind
datastoreReadProjectId 您要从中读取数据的 Datastore 实例的 Google Cloud 项目 ID。
datastoreReadNamespace 所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
textWritePrefix Cloud Storage 路径前缀,用于指定写入数据的位置。例如 gs://mybucket/somefolder/

运行 Datastore to Cloud Storage Text 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Datastore to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="SELECT * FROM DATASTORE_KIND",\
datastoreReadProjectId=DATASTORE_PROJECT_ID,\
datastoreReadNamespace=DATASTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • DATASTORE_PROJECT_ID:Datastore 实例所在的 Cloud 项目的 ID
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "SELECT * FROM DATASTORE_KIND"
       "datastoreReadProjectId": "DATASTORE_PROJECT_ID",
       "datastoreReadNamespace": "DATASTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • DATASTORE_PROJECT_ID:Datastore 实例所在的 Cloud 项目的 ID
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js

Firestore to Cloud Storage Text

Firestore to Cloud Storage Text 模板是一种批处理流水线,可读取 Firestore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。

对此流水线的要求

在运行此流水线之前,必须先在项目中设置 Firestore。

模板参数

参数 说明
firestoreReadGqlQuery 一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind
firestoreReadProjectId 您要从中读取数据的 Firestore 实例的 Google Cloud 项目 ID。
firestoreReadNamespace 所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
textWritePrefix Cloud Storage 路径前缀,用于指定写入数据的位置。例如 gs://mybucket/somefolder/

运行 Firestore to Cloud Storage Text 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Firestore to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="SELECT * FROM FIRESTORE_KIND",\
firestoreReadProjectId=FIRESTORE_PROJECT_ID,\
firestoreReadNamespace=FIRESTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FIRESTORE_PROJECT_ID:Firestore 实例所在的 Cloud 项目的 ID
  • FIRESTORE_KIND:Firestore 实体的类型
  • FIRESTORE_NAMESPACE:Firestore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "SELECT * FROM FIRESTORE_KIND"
       "firestoreReadProjectId": "FIRESTORE_PROJECT_ID",
       "firestoreReadNamespace": "FIRESTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FIRESTORE_PROJECT_ID:Firestore 实例所在的 Cloud 项目的 ID
  • FIRESTORE_KIND:Firestore 实体的类型
  • FIRESTORE_NAMESPACE:Firestore 实体的命名空间
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js

Cloud Spanner to Cloud Storage Avro

Cloud Spanner to Cloud Storage Avro on Cloud Storage 模板是一种批处理流水线,可将整个 Cloud Spanner 数据库以 Avro 格式导出到 Cloud Storage。导出 Cloud Spanner 数据库会在您选择的存储桶中创建一个文件夹。该文件夹包含以下内容:

  • spanner-export.json 文件。
  • 您导出的数据库中每个表的 TableName-manifest.json 文件。
  • 一个或多个 TableName.avro-#####-of-##### 文件。

例如,如果导出包含两个表 SingersAlbums 的数据库,则系统会创建以下文件集:

  • Albums-manifest.json
  • Albums.avro-00000-of-00002
  • Albums.avro-00001-of-00002
  • Singers-manifest.json
  • Singers.avro-00000-of-00003
  • Singers.avro-00001-of-00003
  • Singers.avro-00002-of-00003
  • spanner-export.json

对此流水线的要求

  • Cloud Spanner 数据库必须存在。
  • Cloud Storage 输出存储桶必须存在。
  • 除了运行 Dataflow 作业所需的 IAM 角色之外,您还必须具有适当的 IAM 角色才能读取 Cloud Spanner 数据并写入 Cloud Storage 存储桶。

模板参数

参数 说明
instanceId 需要导出的 Cloud Spanner 数据库的实例 ID。
databaseId 需要导出的 Cloud Spanner 数据库的数据库 ID。
outputDir 您期望的 Avro 文件导出位置的 Cloud Storage 路径。导出作业在此路径下创建一个包含导出文件的新目录。
snapshotTime (可选)与您要读取的 Cloud Spanner 数据库版本对应的时间戳。时间戳必须按照 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式指定。例如 1990-12-31T23:59:60Z。时间戳必须是过去的时间,并且必须遵循时间戳过时上限
tableNames (可选)英文逗号分隔列表,指定要导出的 Cloud Spanner 数据库子集。该列表必须包含所有相关表(父表、外键引用的表)。如果未明确列出,则必须设置“应该导出相关表”标志,才能成功导出。
shouldExportRelatedTables (可选)与“tableNames”参数结合使用的标志,用于包括要导出的所有相关表。
spannerProjectId (可选)您要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。

在 Cloud Storage 模板上运行 Cloud Spanner to Avro 文件

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。

    作业名称必须与以下格式匹配,作业才会显示在 Google Cloud 控制台的 Spanner 实例页面中:

    cloud-spanner-export-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    替换以下内容:

    • SPANNER_INSTANCE_ID:Spanner 实例的 ID
    • SPANNER_DATABASE_NAME:Spanner 数据库的名称
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
outputDir=GCS_DIRECTORY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称

    要使作业显示在 Google Cloud 控制台的 Cloud Spanner 部分中,作业名称必须与 cloud-spanner-export-INSTANCE_ID-DATABASE_ID 格式匹配。

  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • GCS_STAGING_LOCATION:写入临时文件的位置;例如 gs://mybucket/temp
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_DIRECTORY:Avro 文件导出到

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "outputDir": "gs://GCS_DIRECTORY"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称

    要使作业显示在 Google Cloud 控制台的 Cloud Spanner 部分中,作业名称必须与 cloud-spanner-export-INSTANCE_ID-DATABASE_ID 格式匹配。

  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • GCS_STAGING_LOCATION:写入临时文件的位置;例如 gs://mybucket/temp
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_DIRECTORY:Avro 文件导出到

Cloud Spanner to Cloud Storage Text

Cloud Spanner to Cloud Storage Text 模板是一种批处理流水线,可从 Cloud Spanner 表中读取数据,然后将其作为 CSV 文本文件写入 Cloud Storage。

对此流水线的要求

  • 在运行此流水线之前,输入 Spanner 表必须已存在。

模板参数

参数 说明
spannerProjectId 需要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。
spannerDatabaseId 所请求表的数据库 ID。
spannerInstanceId 所请求的表的实例 ID。
spannerTable 用于读取数据的表。
textWritePrefix 写入输出文本文件的目录。在末尾添加“/”。例如:gs://mybucket/somefolder/
spannerSnapshotTime (可选)与您要读取的 Cloud Spanner 数据库版本对应的时间戳。时间戳必须按照 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式指定。例如 1990-12-31T23:59:60Z。时间戳必须是过去的时间,并且必须遵循时间戳过时上限

运行 Cloud Spanner to Cloud Storage Text 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Spanner_to_GCS_Text \
    --region REGION_NAME \
    --parameters \
spannerProjectId=SPANNER_PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_PROJECT_ID:要从中读取数据的 Spanner 数据库的 Cloud 项目 ID
  • DATABASE_ID:Spanner 数据库 ID
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • INSTANCE_ID:Spanner 实例 ID
  • TABLE_ID:Spanner 表 ID

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_PROJECT_ID:要从中读取数据的 Spanner 数据库的 Cloud 项目 ID
  • DATABASE_ID:Spanner 数据库 ID
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • INSTANCE_ID:Spanner 实例 ID
  • TABLE_ID:Spanner 表 ID

Cloud Storage Avro to Bigtable

Cloud Storage Avro to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 Avro 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在,并且列族必须与 Avro 文件中导出的列族相同。
  • 在运行此流水线之前,输入 Avro 文件必须已存在于 Cloud Storage 存储桶中。
  • Bigtable 需要输入 Avro 文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
inputFilePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Avro file to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage Avro to Cloud Spanner

Cloud Storage Avro files to Cloud Spanner 模板是一种批处理流水线,会读取从 Cloud Storage 中存储的 Cloud Spanner 导出的 Avro 文件,并将其导入 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库必须已存在且必须为空。
  • 您必须拥有 Cloud Storage 存储桶的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • Cloud Storage 输入路径必须存在,并且必须包含 spanner-export.json 文件,且该文件包含要导入的文件的 JSON 描述。

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的 ID。
inputDir 导入 Avro 文件的 Cloud Storage 路径。

运行 Cloud Storage Avro to Cloud Spanner 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。

    作业名称必须与以下格式匹配,作业才会显示在 Google Cloud 控制台的 Spanner 实例页面中:

    cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    替换以下内容:

    • SPANNER_INSTANCE_ID:Spanner 实例的 ID
    • SPANNER_DATABASE_NAME:Spanner 数据库的名称
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner \
    --region REGION_NAME \
    --staging-location GCS_STAGING_LOCATION \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
inputDir=GCS_DIRECTORY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INSTANCE_ID:包含数据库的 Spanner 实例的 ID
  • DATABASE_ID:需要导入到的 Spanner 数据库的 ID
  • GCS_DIRECTORY:导入 Avro 文件的 Cloud Storage 路径,例如 gs://mybucket/somefolder

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INSTANCE_ID:包含数据库的 Spanner 实例的 ID
  • DATABASE_ID:需要导入到的 Spanner 数据库的 ID
  • GCS_DIRECTORY:导入 Avro 文件的 Cloud Storage 路径,例如 gs://mybucket/somefolder

Cloud Storage Parquet to Bigtable

Cloudtable Parquet to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 Parquet 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在,并且列族必须与 Parquet 文件中导出的列族相同。
  • 在运行此流水线之前,输入 Parquet 文件必须已存在于 Cloud Storage 存储桶中。
  • Bigtable 需要输入 Parquet 文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
inputFilePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Parquet file to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Parquet Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage SequenceFile to Bigtable

Cloud Storage SequenceFile to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 SequenceFile 读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输入 SequenceFiles 文件必须已存在于 Cloud Storage 存储桶中。
  • 输入 SequenceFile 必须已从 Bigtable 或 HBase 中导出。

模板参数

参数 说明
bigtableProject 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
bigtableAppProfileId 要用于导入的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件
sourcePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage SequenceFile to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the SequenceFile Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProject=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
bigtableAppProfileId=APPLICATION_PROFILE_ID,\
sourcePattern=SOURCE_PATTERN

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • SOURCE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "sourcePattern": "SOURCE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • SOURCE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage Text to BigQuery

Cloud Storage Text to BigQuery 流水线是一种批处理流水线,可用于读取 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery 表。

对此流水线的要求

  • 创建一个用于描述 BigQuery 架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。

    Cloud Storage Text to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的 STRUCT(记录)字段。

    下面的 JSON 描述了一个 BigQuery 架构示例:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。您的函数必须返回一个 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }

模板参数

参数 说明
javascriptTextTransformFunctionName 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
JSONPath 用于定义 BigQuery 架构的 JSON 文件(存储在 Cloud Storage 中)的 gs:// 路径。例如 gs://path/to/my/schema.json
javascriptTextTransformGcsPath .js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
inputFilePattern Cloud Storage 中待处理文本的 gs:// 路径。例如 gs://path/to/my/text/data.txt
outputTable 要创建用以存储已处理数据的 BigQuery 表名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。例如 my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://my-bucket/my-files/temp_dir

运行 Cloud Storage Text to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --parameters \
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

Cloud Storage Text to Datastore [已弃用]

此模板已弃用,将于 2022 年第一季度移除。请迁移到 Cloud Storage Text to Firestore 模板。

Cloud Storage Text to Datastore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Datastore。输入文本文件中的所有行都必须采用 指定的 JSON 格式

对此流水线的要求

  • 必须在目标项目中启用 Datastore。

模板参数

参数 说明
textReadPattern 指定文本数据文件位置的 Cloud Storage 路径模式。例如 gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
datastoreWriteProjectId 要向其写入 Datastore 实体的位置的 Google Cloud 项目 ID
datastoreHintNumWorkers (可选)Datastore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500
errorWritePath 错误日志输出文件,用于写入在处理期间发生的故障。例如 gs://bucket-name/errors.txt

运行 Cloud Storage Text to Datastore 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Datastore template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Datastore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
datastoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Datastore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "datastoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

Cloud Storage Text to Firestore

Cloud Storage Text to Firestore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Firestore。输入文本文件中的所有行都必须采用 指定的 JSON 格式

对此流水线的要求

  • 必须在目标项目中启用 Firestore。

模板参数

参数 说明
textReadPattern 指定文本数据文件位置的 Cloud Storage 路径模式。例如 gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
firestoreWriteProjectId 要向其写入 Firestore 实体的位置的 Google Cloud 项目 ID
firestoreHintNumWorkers (可选)Firestore 逐步增加限制步骤中的预期工作器数量的提示。默认值为 500
errorWritePath 错误日志输出文件,用于写入在处理期间发生的故障。例如 gs://bucket-name/errors.txt

运行 Cloud Storage Text to Firestore 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Firestore template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Firestore \
    --region REGION_NAME \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
firestoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Firestore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "firestoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

Cloud Storage Text to Pub/Sub (Batch)

此模板会创建一种批处理流水线,该流水线可从存储在 Cloud Storage 中的文本文件读取记录,并将其发布到 Pub/Sub 主题。使用此模板,您可以将采用换行符分隔的文件中的 JSON 记录或 CSV 文件中的记录发布到 Pub/Sub 主题,以实现实时处理。您可以使用此模板将数据重放到 Pub/Sub。

此模板不会在各个记录上设置任何时间戳。事件时间等于执行期间的发布时间。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 需要读取的文件必须采用换行符分隔 JSON 或 CSV 格式。在源文件中占多行的记录可能会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

参数 说明
inputFilePattern 要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json
outputTopic 要向其写入数据的 Pub/Sub 输入主题。名称必须采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Cloud Storage Text to Pub/Sub (Batch) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/files/*.json,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

Cloud Storage Text to Cloud Spanner

Cloud Storage Text to Cloud Spanner 模板是一种批处理流水线,用于从 Cloud Storage 读取 CSV 文本文件并将其导入到 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库和表必须已存在。
  • 您必须拥有 Cloud Storage 存储桶的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • 包含 CSV 文件的 Cloud Storage 输入路径必须存在。
  • 您必须创建包含 CSV 文件的 JSON 说明的导入清单文件,并且必须将该清单存储在 Cloud Storage 中。
  • 如果目标 Cloud Spanner 数据库已有架构,则清单文件中指定的任何列都必须与目标数据库架构中的相应列具有相同的数据类型。
  • 采用 ASCII 或 UTF-8 编码的清单文件必须符合以下格式:

  • 要导入的文本文件必须采用 ASCII 或 UTF-8 编码的 CSV 格式。我们建议您不要在 UTF-8 编码文件中使用字节顺序标记 (BOM)。
  • 数据必须与下面的一种类型相匹配:

    GoogleSQL

        BOOL
        INT64
        FLOAT64
        NUMERIC
        STRING
        DATE
        TIMESTAMP
        BYPES
        JSON

    PostgreSQL

        boolean
        bigint
        double precision
        numeric
        character varying, text
        date
        timestamp with time zone
        bytea

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的 ID。
importManifest Cloud Storage 中导入清单文件的路径。
columnDelimiter 源文件使用的列分隔符。默认值为 ,
fieldQualifier 应放置在包含 columnDelimiter 的源文件中任何值前后的字符。默认值为 "
trailingDelimiter 指定源文件中的行是否带有末尾分隔符(即,在每行末尾的最后一列值之后,是否会出现 columnDelimiter 字符)。默认值为 true
escape 源文件使用的转义字符。默认情况下,此参数未设置,并且模板不使用转义字符。
nullString 表示 NULL 值的字符串。默认情况下,此参数未设置,并且模板不使用 null 字符串。
dateFormat 用于解析日期列的格式。默认情况下,流水线会尝试将日期列解析为 yyyy-M-d[' 00:00:00'],例如,解析为 2019-01-31 或 2019-1-1 00:00:00。如果您的日期格式有所不同,请使用 java.time.format.DateTimeFormatter 模式指定格式。
timestampFormat 用于解析时间戳列的格式。如果时间戳为长整数,则会解析为 Unix 时间。否则,时间戳会解析为 java.time.format.DateTimeFormatter.ISO_INSTANT 格式的字符串。对于其他情况,请指定您自己的模式字符串,例如,您可以对 "Jan 21 1998 01:02:03.456+08:00" 格式的时间戳使用 MMM dd yyyy HH:mm:ss.SSSVV

如果您需要使用自定义日期或时间戳格式,请确保这些格式是有效的 java.time.format.DateTimeFormatter 模式。下表显示了日期和时间戳列的自定义格式的其他示例:

类型 输入值 格式 备注
DATE 2011-3-31 默认情况下,模板可以解析此格式。您无需指定 dateFormat 参数。
DATE 2011-3-31 00:00:00 默认情况下,模板可以解析此格式。您无需指定格式。如果需要,您可以使用 yyyy-M-d' 00:00:00'
DATE 01 Apr, 18 dd MMM, yy
DATE Wednesday, April 3, 2019 AD EEEE, LLLL d, yyyy G
TIMESTAMP 2019-01-02T11:22:33Z
2019-01-02T11:22:33.123Z
2019-01-02T11:22:33.12356789Z
默认格式 ISO_INSTANT 可以解析此类型的时间戳。您无需提供 timestampFormat 参数。
TIMESTAMP 1568402363 默认情况下,模板可以解析此类型的时间戳并将其视为 Unix 时间。
TIMESTAMP Tue, 3 Jun 2008 11:05:30 GMT EEE, d MMM yyyy HH:mm:ss VV
TIMESTAMP 2018/12/31 110530.123PST yyyy/MM/dd HHmmss.SSSz
TIMESTAMP 2019-01-02T11:22:33Z 或 2019-01-02T11:22:33.123Z yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 如果输入列为 2019-01-02T11:22:33Z 和 2019-01-02T11:22:33.123Z 的混合格式,则默认格式可以解析此类型的时间戳。您无需提供自己的格式参数。您可以使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 来处理这两种情况。您不能使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]'Z',因为后缀“Z”必须解析为时区 ID(而不是字符字面量)。在内部,时间戳列会转换为 java.time.Instant。因此,您必须以 UTC 指定时间戳列,或者将时区信息与其相关联。本地日期时间(例如 2019-01-02 11:22:33)无法解析为有效的 java.time.Instant

在 Cloud Storage to Cloud Spanner 模板上运行文本文件

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Cloud Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner \
    --region REGION_NAME \
    --parameters \
instanceId=INSTANCE_ID,\
databaseId=DATABASE_ID,\
importManifest=GCS_PATH_TO_IMPORT_MANIFEST

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_PATH_TO_IMPORT_MANIFEST:导入清单文件的 Cloud Storage 路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "importManifest": "GCS_PATH_TO_IMPORT_MANIFEST"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_PATH_TO_IMPORT_MANIFEST:导入清单文件的 Cloud Storage 路径

Cloud Storage to Elasticsearch

Cloud Storage to Elasticsearch 模板是一种批处理流水线,可从存储在 Cloud Storage 存储桶中的 csv 文件读取数据,并将数据作为 JSON 文档注入到 Elasticsearch 中。

对此流水线的要求

  • Cloud Storage 存储桶必须存在。
  • Google Cloud 实例或 Elasticsearch Cloud 上必须存在可通过 Dataflow 访问的 Elasticsearch 主机。
  • 错误输出的 BigQuery 表必须存在。

模板参数

参数 说明
inputFileSpec 用于搜索 CSV 文件的 Cloud Storage 文件格式。示例:gs://mybucket/test-*.csv
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
index 将对其发出请求的 Elasticsearch 索引,例如 my-index
deadletterTable 将失败的插入发送到的 BigQuery Deadletter 表。示例:<your-project>:<your-dataset>.<your-table-name>
containsHeaders (可选)用于指明 CSV 中是否包含标题的布尔值。默认值:true
delimiter (可选)CSV 使用的分隔符。示例:,
csvFormat (可选)基于 Apache Commons CSV 格式的 CSV 格式。默认值:Default
jsonSchemaPath (可选)JSON 架构的路径。默认值:null
largeNumFiles (可选)如果文件数达到数万个,则设置为 true。默认值:false
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:不重试。
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:不重试。
csvFileEncoding (可选)CSV 文件编码。
propertyAsIndex (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。
propertyAsId (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。
javaScriptIndexFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIndexFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIdFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptIdFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptTypeFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptTypeFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptIsDeleteFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
javaScriptIsDeleteFnName (可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
usePartialUpdate (可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false
bulkInsertMethod (可选)在 Elasticsearch 批量请求中使用 INDEX(索引,允许执行更新插入操作)还是 CREATE(创建,会对重复 _id 报错)。默认值:CREATE

运行 Cloud Storage to Elasticsearch 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Storage to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 表。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/GCS_to_Elasticsearch",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 表。

Java Database Connectivity (JDBC) to BigQuery

JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系数据库。您可以使用此模板将数据从任何具有可用 JDBC 驱动程序的关系数据库复制到 BigQuery 中。为了增加一项保护措施,您还可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 关系数据库的 JDBC 驱动程序必须可用。
  • 在运行此流水线之前,BigQuery 表必须已存在。
  • BigQuery 表必须具有兼容的架构。
  • 必须能够从 Dataflow 运行的子网访问关系数据库。

模板参数

参数 说明
driverJars 以逗号分隔的驱动程序 JAR 文件列表。例如 gs://<my-bucket>/driver_jar1.jar,gs://<my-bucket>/driver_jar2.jar
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionURL JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。
query 要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table
outputTable BigQuery 输出表位置,采用 <my-project>:<my-dataset>.<my-table> 格式。
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://<my-bucket>/my-files/temp_dir
connectionProperties (可选)用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]*。例如 unicode=true;characterEncoding=UTF-8
username (可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
password (可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
KMSEncryptionKey (可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。
disabledAlgorithms (可选)要停用的以英文逗号分隔的算法。如果此值设置为 none,则不会停用任何算法。请谨慎使用,因为默认停用的算法已知存在漏洞或性能问题。例如 SSLv3, RC4.
extraFilesToStage 用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

运行 JDBC to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the JDBC to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Jdbc_to_BigQuery \
    --region REGION_NAME \
    --parameters \
driverJars=DRIVER_PATHS,\
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:驱动器类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • DATASET:您的 BigQuery 数据集,并替换 TABLE_NAME:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Jdbc_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverJars": "DRIVER_PATHS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:驱动器类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • DATASET:您的 BigQuery 数据集,并替换 TABLE_NAME:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

Java Database Connectivity (JDBC) to Pub/Sub

Java Database Connectivity (JDBC) to Pub/Sub 模板是一个批处理流水线,可从 JDBC 源注入数据,并将生成的记录作为 JSON 字符串写入预先存在的 Pub/Sub 主题。

对此流水线的要求

  • 在运行流水线之前,JDBC 源必须已存在。
  • 在运行流水线之前,Cloud Pub/Sub 输出主题必须已存在。

模板参数

参数 说明
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionUrl JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。
driverJars 以英文逗号分隔的 JDBC 驱动程序 Cloud Storage 路径。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username (可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
password (可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
connectionProperties (可选)用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]*。例如 unicode=true;characterEncoding=UTF-8
query 要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table
outputTopic 要发布到的 Pub/Sub 主题,格式为 projects/<project>/topics/<topic>
KMSEncryptionKey (可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。
extraFilesToStage 用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

运行 Java Database Connectivity (JDBC) to Pub/Sub 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the JDBC to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/flex/Jdbc_to_PubSub \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
query=SOURCE_SQL_QUERY,\
outputTopic=OUTPUT_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • OUTPUT_TOPIC:要发布到的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "query": "SOURCE_SQL_QUERY",
       "outputTopic": "OUTPUT_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • OUTPUT_TOPIC:要发布到的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

Apache Cassandra to Cloud Bigtable

“从 Apache Cassandra 到 Cloud Bigtable”模板会将一个表从 Apache Cassandra 复制到 Cloud Bigtable。此模板需要最低限度的配置,可在 Cloud Bigtable 中尽可能准确地复制 Cassandra 中的表结构。

“从 Apache Cassandra 到 Cloud Bigtable”模板适用于以下情况:

  • 在可以接受短时间停机的情况下,迁移 Apache Cassandra 数据库。
  • 定期将 Cassandra 表复制到 Cloud Bigtable 以便向全球用户传送数据。

对此流水线的要求

  • 在运行此流水线之前,目标 Bigtable 表必须已存在。
  • Dataflow 工作器与 Apache Cassandra 节点之间建立了网络连接。

类型转换

“从 Apache Cassandra 到 Cloud Bigtable”模板会自动将 Apache Cassandra 数据类型转换为 Cloud Bigtable 的数据类型。

大多数原语在 Cloud Bigtable 和 Apache Cassandra 中的表示方式相同;但以下原语的表示方式有所不同。

  • DateTimestamp 会转化为 DateTime 对象
  • UUID 被转换为 String
  • Varint 被转换为 BigDecimal

Apache Cassandra 还原生支持更复杂的类型,例如 TupleListSetMap。此流水线不支持元组,因为 Apache Beam 中没有相应的类型。

例如,在 Apache Cassandra 中,您有一个 List 类型的列,名为“mylist”,还有一些类似下表内容的值。

row mylist
1 (a,b,c)

流水线会直接将此 List 列扩展为三个不同的列(在 Cloud Bigtable 中称为列限定符)。列名称为“mylist”,但流水线还会附加上列表中各项的索引编号,例如“mylist[0]”。

row mylist[0] mylist[1] mylist[2]
1 a b c

流水线按照处理列表的方式来处理集合,但添加了后缀来指明单元是键还是值。

row mymap
1 {"first_key":"first_value","another_key":"different_value"}

转换后,表如下所示:

row mymap[0].key mymap[0].value mymap[1].key mymap[1].value
1 first_key first_value another_key different_value

主键转换

在 Apache Cassandra 中,主键是使用数据定义语言定义的。主键可以是简单的、复合的,或者是与聚簇列复合的。Cloud Bigtable 支持手动行键构造,按字典顺序对字节数组进行排序。此流水线会收集有关键类型的信息,并遵循基于多个值构建行键的最佳做法构建键。

模板参数

参数 说明
cassandraHosts 以英文逗号分隔的列表中的 Apache Cassandra 节点的主机。
cassandraPort (可选)节点上用于访问 Apache Cassandra 的 TCP 端口(默认为 9042)。
cassandraKeyspace 表格所在的 Apache Cassandra 键空间。
cassandraTable 要复制的 Apache Cassandra 表格。
bigtableProjectId 从中复制 Apache Cassandra 表的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 要复制 Apache Cassandra 表格的 Bigtable 实例 ID。
bigtableTableId 要复制 Apache Cassandra 表格的 Bigtable 表格的名称。
defaultColumnFamily (可选)Bigtable 表格的列族名称(默认为 default)。
rowKeySeparator (可选)用于构建行键的分隔符(默认为 #)。

运行 Apache Cassandra to Cloud Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cassandra to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableTableId=BIGTABLE_TABLE_ID,\
cassandraHosts=CASSANDRA_HOSTS,\
cassandraKeyspace=CASSANDRA_KEYSPACE,\
cassandraTable=CASSANDRA_TABLE

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:Cloud Bigtable 所在的项目 ID
  • BIGTABLE_INSTANCE_ID:Cloud Bigtable 实例 ID
  • BIGTABLE_TABLE_ID:您的 Cloud Bigtable 表名称
  • CASSANDRA_HOSTS:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号
  • CASSANDRA_KEYSPACE:表格所在的 Apache Cassandra 键空间
  • CASSANDRA_TABLE:需要迁移的 Apache Cassandra 表

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
       "bigtableTableId": "BIGTABLE_TABLE_ID",
       "cassandraHosts": "CASSANDRA_HOSTS",
       "cassandraKeyspace": "CASSANDRA_KEYSPACE",
       "cassandraTable": "CASSANDRA_TABLE"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJET_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • BIGTABLE_PROJECT_ID:Cloud Bigtable 所在的项目 ID
  • BIGTABLE_INSTANCE_ID:Cloud Bigtable 实例 ID
  • BIGTABLE_TABLE_ID:您的 Cloud Bigtable 表名称
  • CASSANDRA_HOSTS:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号
  • CASSANDRA_KEYSPACE:表格所在的 Apache Cassandra 键空间
  • CASSANDRA_TABLE:需要迁移的 Apache Cassandra 表

MongoDB to BigQuery

MongoDB to BigQuery 模板是一种批处理流水线,可从 MongoDB 读取文档并按照 userOption 参数指定的选项将其写入 BigQuery。

对此流水线的要求

  • 目标 BigQuery 数据集必须已存在。
  • 必须可从 Dataflow 工作器机器访问 MongoDB 源实例。

模板参数

参数 说明
mongoDbUri MongoDB 连接 URI,格式为 mongodb+srv://:@
database 从中读取集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
outputTableSpec 要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table
userOption FLATTENNONEFLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。

运行 MongoDB to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。
  • USER_OPTION:FLATTEN 或 NONE。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。
  • USER_OPTION:FLATTEN 或 NONE。