Google 提供的批处理模板

Google 提供了一组开源 Dataflow 模板。有关模板的一般信息,请参阅概览页面。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板页面。

本页面介绍了批处理模板。

BigQuery to Cloud Storage TFRecords

BigQuery to Cloud Storage TFRecords 模板是一种流水线,可从 BigQuery 查询读取数据并以 TFRecord 格式将其写入 Cloud Storage 存储分区。您可以指定训练、测试和验证拆分百分比。默认情况下,训练集的拆分比例为 1 或 100%,测试和验证集的拆分比例为 0 或 0%。设置数据集拆分比例时,训练、测试和验证之和加起来必须为 1 或 100%(例如,0.6 + 0.2 + 0.2)。Dataflow 会自动确定每个输出数据集的最佳分片数。

对此流水线的要求

  • BigQuery 数据集和表必须已存在。
  • 输出 Cloud Storage 存储分区必须存在才能执行此流水线。训练、测试和验证子目录不需要预先存在,将会自动生成。

模板参数

参数 说明
readQuery 用于从来源中提取数据的 BigQuery SQL 查询。例如 select * from dataset1.sample_table
outputDirectory 在其中写入训练、测试和验证 TFRecord 文件的顶级 Cloud Storage 路径前缀。例如 gs://mybucket/output。生成的训练、测试和验证 TFRecord 文件的子目录根据 outputDirectory 自动生成。例如 gs://mybucket/output/train
trainingPercentage (可选)分配给训练 TFRecord 文件的查询数据所占的百分比。默认值为 1 或 100%。
testingPercentage (可选)分配给测试 TFRecord 文件的查询数据所占的百分比。默认值为 0 或 0%。
validationPercentage (可选)分配给验证 TFRecord 文件的查询数据所占的百分比。默认值为 0 或 0%。
outputSuffix (可选)写入的训练、测试和验证 TFRecord 文件的文件后缀。默认值为 .tfrecord

运行 BigQuery to Cloud Storage TFRecord files 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to TFRecords template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_BigQuery_to_GCS_TensorFlow_Records \
    --parameters readQuery=READ_QUERY,outputDirectory=OUTPUT_DIRECTORY,trainingPercentage=TRAINING_PERCENTAGE,testingPercentage=TESTING_PERCENTAGE,validationPercentage=VALIDATION_PERCENTAGE,outputSuffix=OUTPUT_FILENAME_SUFFIX

替换以下值:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • READ_QUERY:要执行的 BigQuery 查询
  • OUTPUT_DIRECTORY:输出数据集的 Cloud Storage 路径前缀
  • TRAINING_PERCENTAGE:训练数据集的拆分小数百分比
  • TESTING_PERCENTAGE:测试数据集的拆分小数百分比
  • VALIDATION_PERCENTAGE:验证数据集的拆分小数百分比
  • OUTPUT_FILENAME_SUFFIX:首选输出 TensorFlow 记录文件后缀

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records

如需使用 REST API 运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_BigQuery_to_GCS_TensorFlow_Records
{
   "jobName": "JOB_NAME",
   "parameters": {
       "readQuery":"READ_QUERY",
       "outputDirectory":"OUTPUT_DIRECTORY",
       "trainingPercentage":"TRAINING_PERCENTAGE",
       "testingPercentage":"TESTING_PERCENTAGE",
       "validationPercentage":"VALIDATION_PERCENTAGE",
       "outputSuffix":"OUTPUT_FILENAME_SUFFIX"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下值:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • READ_QUERY:要执行的 BigQuery 查询
  • OUTPUT_DIRECTORY:输出数据集的 Cloud Storage 路径前缀
  • TRAINING_PERCENTAGE:训练数据集的拆分小数百分比
  • TESTING_PERCENTAGE:测试数据集的拆分小数百分比
  • VALIDATION_PERCENTAGE:验证数据集的拆分小数百分比
  • OUTPUT_FILENAME_SUFFIX:首选输出 TensorFlow 记录文件后缀

BigQuery export to Parquet(通过 Storage API)

BigQuery export to Parquet 模板是一种批处理流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储分区。此模板利用 BigQuery Storage API 导出数据。

对此流水线的要求

  • 在运行此流水线之前,输入 BigQuery 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储分区必须已存在。

模板参数

参数 说明
tableRef BigQuery 输入表位置。例如 <my-project>:<my-dataset>.<my-table>
bucket 要在其中写入 Parquet 文件的 Cloud Storage 文件夹。例如 gs://mybucket/exports
numShards (可选)输出文件分片数。默认值为 1。
fields (可选)要从输入 BigQuery 表格选择的以英文逗号分隔的字段列表。

运行 BigQuery to Cloud Storage Parquet 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery export to Parquet (via Storage API) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行 Flex 模板,必须具有 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/BigQuery_to_Parquet \
    --parameters \
tableRef=BIGQUERY_TABLE,\
bucket=OUTPUT_DIRECTORY,\
numShards=NUM_SHARDS,\
fields=FIELDS

替换以下值:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • OUTPUT_DIRECTORY:输出文件的 Cloud Storage 文件夹
  • NUM_SHARDS:所需的输出文件分片数
  • FIELDS:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表
  • LOCATION:执行区域,例如 us-central1

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/BigQuery_to_Parquet

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "tableRef": "BIGQUERY_TABLE",
          "bucket": "OUTPUT_DIRECTORY",
          "numShards": "NUM_SHARDS",
          "fields": "FIELDS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/BigQuery_to_Parquet",
   }
}

替换以下值:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • OUTPUT_DIRECTORY:输出文件的 Cloud Storage 文件夹
  • NUM_SHARDS:所需的输出文件分片数
  • FIELDS:要从输入 BigQuery 表中选择的以英文逗号分隔的字段列表
  • LOCATION:执行区域,例如 us-central1

Bigtable to Cloud Storage Avro

Bigtable to Cloud Storage Avro 模板是一种流水线,可从 Bigtable 表中读取数据并以 Avro 格式将其写入 Cloud Storage 存取分区。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储分区必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
outputDirectory 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Avro 文件名的前缀。例如 output-

运行 Bigtable to Cloud Storage Avro file 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Avro Files on Cloud Storage template 。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Avro \
    --parameters bigtableProjectId=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,outputDirectory=OUTPUT_DIRECTORY,filenamePrefix=FILENAME_PREFIX

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Avro 文件名的前缀,例如 output-

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Avro 文件名的前缀,例如 output-

Bigtable to Cloud Storage Parquet

Bigtable to Cloud Storage Parquet 模板是一种流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储分区。您可以使用该模板将数据从 Bigtable 移动到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储分区必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
outputDirectory 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Parquet 文件名的前缀。例如 output-
numShards 输出文件分片数。例如 2

运行 Bigtable to Cloud Storage Parquet file 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to Parquet Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Parquet \
    --parameters bigtableProjectId=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,outputDirectory=OUTPUT_DIRECTORY,filenamePrefix=FILENAME_PREFIX,numShards=NUM_SHARDS

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Parquet 文件名的前缀,例如 output-
  • NUM_SHARDS:要输出的 Parquet 文件的数量,例如 1

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
       "numShards": "NUM_SHARDS"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • OUTPUT_DIRECTORY:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:Parquet 文件名的前缀,例如 output-
  • NUM_SHARDS:要输出的 Parquet 文件的数量,例如 1

Bigtable to Cloud Storage SequenceFile

Bigtable to Cloud Storage SequenceFile 模板是一种流水线,可从 Bigtable 表读取数据并以 SequenceFile 格式将其写入 Cloud Storage 存储分区。您可以使用该模板将数据从 Bigtable 复制到 Cloud Storage。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输出 Cloud Storage 存储分区必须已存在。

模板参数

参数 说明
bigtableProject 您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导出的 Bigtable 表的 ID。
bigtableAppProfileId 要用于导出的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件
destinationPath 写入数据的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix SequenceFile 文件名的前缀。例如 output-

运行 Bigtable to Cloud Storage SequenceFile 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Bigtable to SequenceFile Files on Cloud Storage template 。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_SequenceFile \
    --parameters bigtableProject=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,bigtableAppProfileId=APPLICATION_PROFILE_ID,destinationPath=DESTINATION_PATH,filenamePrefix=FILENAME_PREFIX

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • DESTINATION_PATH:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:SequenceFile 文件名的前缀,例如 output-

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_SequenceFile
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "destinationPath": "DESTINATION_PATH",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • DESTINATION_PATH:写入数据的 Cloud Storage 路径,例如 gs://mybucket/somefolder
  • FILENAME_PREFIX:SequenceFile 文件名的前缀,例如 output-

Datastore to Cloud Storage Text

Datastore to Cloud Storage Text 模板是一种批处理流水线,可读取 Datastore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。

对此流水线的要求

在运行此流水线之前,必须先在项目中设置 Datastore。

模板参数

参数 说明
datastoreReadGqlQuery 一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind
datastoreReadProjectId 您要从中读取数据的 Datastore 实例的 Google Cloud 项目 ID。
datastoreReadNamespace 所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。
javascriptTextTransformGcsPath 包含所有 JavaScript 代码的 Cloud Storage 路径。例如 gs://mybucket/mytransforms/*.js。如果您不想提供函数,请将此参数留空。
javascriptTextTransformFunctionName 需要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform。如果您不想提供函数,请将此参数留空。
textWritePrefix Cloud Storage 路径前缀,用于指定写入数据的位置。例如 gs://mybucket/somefolder/

运行 Datastore to Cloud Storage Text 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Datastore to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Datastore_to_GCS_Text
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_GCS_Text \
    --parameters \
datastoreReadGqlQuery="SELECT * FROM DATASTORE_KIND",\
datastoreReadProjectId=PROJECT_ID,\
datastoreReadNamespace=DATASTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://BUCKET_NAME/output/

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Datastore_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "SELECT * FROM DATASTORE_KIND"
       "datastoreReadProjectId": "PROJECT_ID",
       "datastoreReadNamespace": "DATASTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • DATASTORE_KIND:您的 Datastore 实体的类型
  • DATASTORE_NAMESPACE:Datastore 实体的命名空间
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径

Cloud Spanner to Cloud Storage Avro

Cloud Spanner to Cloud Storage Avro on Cloud Storage 模板是一种批处理流水线,可将整个 Cloud Spanner 数据库以 Avro 格式导出到 Cloud Storage。导出 Cloud Spanner 数据库会在您选择的存储分区中创建一个文件夹。该文件夹包含以下内容:

  • spanner-export.json 文件。
  • 您导出的数据库中每个表的 TableName-manifest.json 文件。
  • 一个或多个 TableName.avro-#####-of-##### 文件。

例如,如果导出包含两个表 SingersAlbums 的数据库,则系统会创建以下文件集:

  • Albums-manifest.json
  • Albums.avro-00000-of-00002
  • Albums.avro-00001-of-00002
  • Singers-manifest.json
  • Singers.avro-00000-of-00003
  • Singers.avro-00001-of-00003
  • Singers.avro-00002-of-00003
  • spanner-export.json

对此流水线的要求

  • Cloud Spanner 数据库必须存在。
  • Cloud Storage 输出存储分区必须存在。
  • 除了运行 Dataflow 作业所需的 IAM 角色之外,您还必须具有适当的 IAM 角色才能读取 Cloud Spanner 数据并写入 Cloud Storage 存储分区。

模板参数

参数 说明
instanceId 需要导出的 Cloud Spanner 数据库的实例 ID。
databaseId 需要导出的 Cloud Spanner 数据库的数据库 ID。
outputDir 您期望的 Avro 文件导出位置的 Cloud Storage 路径。导出作业在此路径下创建一个包含导出文件的新目录。
snapshotTime (可选)与您要读取的 Cloud Spanner 数据库版本对应的时间戳。时间戳必须按照 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式指定。例如 1990-12-31T23:59:60Z。时间戳必须是过去的时间,并且必须遵循时间戳过时上限
spannerProjectId (可选)您要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。

在 Cloud Storage 模板上运行 Cloud Spanner to Avro 文件

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。

    作业名称必须与以下格式匹配,作业才会显示在 Cloud Console 的 Spanner 实例页面中:

    cloud-spanner-export-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    请替换以下内容:

    • SPANNER_INSTANCE_ID:Spanner 实例的 ID
    • SPANNER_DATABASE_NAME:Spanner 数据库的名称
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro
gcloud dataflow jobs run JOB_NAME \
    --gcs-location='gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro' \
    --region=DATAFLOW_REGION \
    --parameters='instanceId=INSTANCE_ID,databaseId=DATABASE_ID,outputDir=GCS_DIRECTORY'

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • GCS_STAGING_LOCATION:写入临时文件的路径(例如 gs://mybucket/temp
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_DIRECTORY:Avro 文件导出到

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/DATAFLOW_REGION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "outputDir": "gs://GCS_DIRECTORY"
   }
}

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_DIRECTORY:Avro 文件导出到
  • JOB_NAME:您选择的作业名称
    • 作业名称必须与 cloud-spanner-export-INSTANCE_ID-DATABASE_ID 格式匹配才会在 Cloud Console 的 Cloud Spanner 部分显示。

Cloud Spanner to Cloud Storage Text

Cloud Spanner to Cloud Storage Text 模板是一种批处理流水线,可从 Cloud Spanner 表中读取数据,然后将其作为 CSV 文本文件写入 Cloud Storage。

对此流水线的要求

  • 在运行此流水线之前,输入 Spanner 表必须已存在。

模板参数

参数 说明
spannerProjectId 需要从中读取数据的 Cloud Spanner 数据库的 Google Cloud 项目 ID。
spannerDatabaseId 所请求表的数据库 ID。
spannerInstanceId 所请求的表的实例 ID。
spannerTable 用于读取数据的表。
textWritePrefix 写入输出文本文件的目录。在末尾添加“/”。例如:gs://mybucket/somefolder/

运行 Cloud Spanner to Cloud Storage Text 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Spanner_to_GCS_Text
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Spanner_to_GCS_Text \
    --parameters \
spannerProjectId=PROJECT_ID,\
spannerDatabaseId=DATABASE_ID,\
spannerInstanceId=INSTANCE_ID,\
spannerTable=TABLE_ID,\
textWritePrefix=gs://BUCKET_NAME/output/

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • DATABASE_ID:Spanner 数据库 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • INSTANCE_ID:Spanner 实例 ID
  • TABLE_ID:Spanner 表 ID
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Spanner_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "PROJECT_ID",
       "spannerDatabaseId": "DATABASE_ID",
       "spannerInstanceId": "INSTANCE_ID",
       "spannerTable": "TABLE_ID",
       "textWritePrefix": "gs://BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • DATABASE_ID:Spanner 数据库 ID
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • INSTANCE_ID:Spanner 实例 ID
  • TABLE_ID:Spanner 表 ID
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称

Cloud Storage Avro to Bigtable

Cloud Storage Avro to Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区中的 Avro 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在,并且列族必须与 Avro 文件中导出的列族相同。
  • 在运行此流水线之前,输入 Avro 文件必须已存在于 Cloud Storage 存储分区中。
  • Bigtable 需要输入 Avro 文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
inputFilePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Avro file to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable \
    --parameters bigtableProjectId=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,inputFilePattern=INPUT_FILE_PATTERN

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage Avro to Cloud Spanner

Cloud Storage Avro files to Cloud Spanner 模板是一种批处理流水线,会读取从 Cloud Storage 中存储的 Cloud Spanner 导出的 Avro 文件,并将其导入 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库必须已存在且必须为空。
  • 您必须拥有 Cloud Storage 存储分区的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • Cloud Storage 输入路径必须存在,并且必须包含 spanner-export.json 文件,且该文件包含要导入的文件的 JSON 描述。

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的 ID。
inputDir 导入 Avro 文件的 Cloud Storage 路径。

运行 Cloud Storage Avro to Cloud Spanner 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。

    作业名称必须与以下格式匹配,作业才会显示在 Cloud Console 的 Spanner 实例页面中:

    cloud-spanner-import-SPANNER_INSTANCE_ID-SPANNER_DATABASE_NAME

    请替换以下内容:

    • SPANNER_INSTANCE_ID:Spanner 实例的 ID
    • SPANNER_DATABASE_NAME:Spanner 数据库的名称
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Avro Files on Cloud Storage to Cloud Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner
gcloud dataflow jobs run JOB_NAME \
    --gcs-location='gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner' \
    --region=DATAFLOW_REGION \
    --staging-location=GCS_STAGING_LOCATION \
    --parameters='instanceId=INSTANCE_ID,databaseId=DATABASE_ID,inputDir=GCS_DIRECTORY'

请替换以下内容:

  • (仅限 API)PROJECT_ID:您的项目 ID
  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INSTANCE_ID:包含数据库的 Spanner 实例的 ID
  • DATABASE_ID:需要导入到的 Spanner 数据库的 ID
  • (仅限 gcloud)GCS_STAGING_LOCATION:用于写入临时文件的路径,例如 gs://mybucket/temp
  • GCS_DIRECTORY:导入 Avro 文件的 Cloud Storage 路径,例如 gs://mybucket/somefolder

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner

此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/DATAFLOW_REGION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "inputDir": "gs://GCS_DIRECTORY"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

请替换以下内容:

  • (仅限 API)PROJECT_ID:您的项目 ID
  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INSTANCE_ID:包含数据库的 Spanner 实例的 ID
  • DATABASE_ID:需要导入到的 Spanner 数据库的 ID
  • (仅限 gcloud)GCS_STAGING_LOCATION:用于写入临时文件的路径,例如 gs://mybucket/temp
  • GCS_DIRECTORY:导入 Avro 文件的 Cloud Storage 路径,例如 gs://mybucket/somefolder

Cloud Storage Parquet to Bigtable

Cloudtable Parquet to Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区中的 Parquet 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在,并且列族必须与 Parquet 文件中导出的列族相同。
  • 在运行此流水线之前,输入 Parquet 文件必须已存在于 Cloud Storage 存储分区中。
  • Bigtable 需要输入 Parquet 文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
inputFilePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Parquet file to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Parquet Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Parquet_to_Cloud_Bigtable \
    --parameters bigtableProjectId=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,inputFilePattern=INPUT_FILE_PATTERN

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage SequenceFile to Bigtable

Cloud Storage SequenceFile to Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区中的 SequenceFile 读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

对此流水线的要求

  • Bigtable 表必须已存在。
  • 在运行此流水线之前,输入 SequenceFiles 文件必须已存在于 Cloud Storage 存储分区中。
  • 输入 SequenceFile 必须已从 Bigtable 或 HBase 中导出。

模板参数

参数 说明
bigtableProject 您要将数据写入的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 表所属的 Bigtable 实例的 ID。
bigtableTableId 要导入的 Bigtable 表的 ID。
bigtableAppProfileId 要用于导入的 Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Bigtable 将使用该实例的默认应用配置文件
sourcePattern 数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage SequenceFile to Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Parquet Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_SequenceFile_to_Cloud_Bigtable \
    --parameters bigtableProject=PROJECT_ID,bigtableInstanceId=INSTANCE_ID,bigtableTableId=TABLE_ID,bigtableAppProfileId=APPLICATION_PROFILE_ID,sourcePattern=SOURCE_PATTERN

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • SOURCE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_SequenceFile_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProject": "PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "bigtableAppProfileId": "APPLICATION_PROFILE_ID",
       "sourcePattern": "SOURCE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • APPLICATION_PROFILE_ID:将用于导出的 Bigtable 应用配置文件的 ID。
  • SOURCE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

Cloud Storage Text to BigQuery

Cloud Storage Text to BigQuery 流水线是一种批处理流水线,可用于读取 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery 表。

对此流水线的要求

  • 创建一个用于描述 BigQuery 架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。

    Cloud Storage Text to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的 STRUCT(记录)字段。

    下面的 JSON 描述了一个 BigQuery 架构示例:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。您的函数必须返回一个 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformFunctionName 要从 .js 文件中调用的函数的名称。
JSONPath 用于定义 BigQuery 架构的 JSON 文件(存储在 Cloud Storage 中)的 gs:// 路径。例如 gs://path/to/my/schema.json
javascriptTextTransformGcsPath 用于定义 UDF 的 JavaScript 文件的 gs:// 路径。例如 gs://path/to/my/javascript_function.js
inputFilePattern Cloud Storage 中待处理文本的 gs:// 路径。例如 gs://path/to/my/text/data.txt
outputTable 要创建用以存储已处理数据的 BigQuery 表名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。例如 my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://my-bucket/my-files/temp_dir

运行 Cloud Storage Text to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

Cloud Storage Text to Datastore

Cloud Storage Text to Datastore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Datastore。输入文本文件中的所有行都必须采用 指定的 JSON 格式

对此流水线的要求

  • 必须在目标项目中启用 Datastore。

模板参数

参数 说明
textReadPattern 指定文本数据文件位置的 Cloud Storage 路径模式。例如 gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath 包含所有 JavaScript 代码的 Cloud Storage 路径模式。例如 gs://mybucket/mytransforms/*.js。如果您不想提供函数,请将此参数留空。
javascriptTextTransformFunctionName 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform。如果您不想提供函数,请将此参数留空。
datastoreWriteProjectId 要向其写入 Datastore 实体的位置的 Google Cloud 项目 ID
errorWritePath 要用于写入在处理期间发生的故障的错误日志输出文件。例如 gs://bucket-name/errors.txt

运行 Cloud Storage Text to Datastore 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Datastore template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Datastore
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_Datastore \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
datastoreWriteProjectId=PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Datastore

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_Datastore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "datastoreWriteProjectId": "PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • PATH_TO_INPUT_TEXT_FILES:Cloud Storage 上的输入文件模式
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • ERROR_FILE_WRITE_PATH:Cloud Storage 上错误文件所需的路径

Cloud Storage Text to Pub/Sub (Batch)

此模板会创建一种批处理流水线,该流水线可从存储在 Cloud Storage 中的文本文件读取记录,并将其发布到 Pub/Sub 主题。使用此模板,您可以将采用换行符分隔的文件中的 JSON 记录或 CSV 文件中的记录发布到 Pub/Sub 主题,以实现实时处理。您可以使用此模板将数据重放到 Pub/Sub。

此模板不会在各个记录上设置任何时间戳。事件时间等于执行期间的发布时间。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 需要读取的文件必须采用换行符分隔 JSON 或 CSV 格式。在源文件中占多行的记录可能会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

参数 说明
inputFilePattern 要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json
outputTopic 要向其写入数据的 Pub/Sub 输入主题。名称必须采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Cloud Storage Text to Pub/Sub (Batch) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/files/*.json,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称

Cloud Storage Text to Cloud Spanner

Cloud Storage Text to Cloud Spanner 模板是一种批处理流水线,用于从 Cloud Storage 读取 CSV 文本文件并将其导入到 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库和表必须已存在。
  • 您必须拥有 Cloud Storage 存储分区的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • 包含 CSV 文件的 Cloud Storage 输入路径必须存在。
  • 您必须创建包含 CSV 文件的 JSON 说明的导入清单文件,并且必须将该清单存储在 Cloud Storage 中。
  • 如果目标 Cloud Spanner 数据库已有架构,则清单文件中指定的任何列都必须与目标数据库架构中的相应列具有相同的数据类型。
  • 采用 ASCII 或 UTF-8 编码的清单文件必须符合以下格式:

  • 要导入的文本文件必须采用 ASCII 或 UTF-8 编码的 CSV 格式。我们建议您不要在 UTF-8 编码文件中使用字节顺序标记 (BOM)。
  • 数据必须与下面的一种类型相匹配:
    • INT64
    • FLOAT64
    • BOOL
    • STRING
    • DATE
    • TIMESTAMP

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的 ID。
importManifest Cloud Storage 中导入清单文件的路径。
columnDelimiter 源文件使用的列分隔符。默认值为 ,
fieldQualifier 应放置在包含 columnDelimiter 的源文件中任何值前后的字符。默认值为 "
trailingDelimiter 指定源文件中的行是否带有末尾分隔符(即,在每行末尾的最后一列值之后,是否会出现 columnDelimiter 字符)。默认值为 true
escape 源文件使用的转义字符。默认情况下,此参数未设置,并且模板不使用转义字符。
nullString 表示 NULL 值的字符串。默认情况下,此参数未设置,并且模板不使用 null 字符串。
dateFormat 用于解析日期列的格式。默认情况下,流水线会尝试将日期列解析为 yyyy-M-d[' 00:00:00'],例如,解析为 2019-01-31 或 2019-1-1 00:00:00。如果您的日期格式有所不同,请使用 java.time.format.DateTimeFormatter 模式指定格式。
timestampFormat 用于解析时间戳列的格式。如果时间戳为长整数,则会解析为 Unix 时间。否则,时间戳会解析为 java.time.format.DateTimeFormatter.ISO_INSTANT 格式的字符串。对于其他情况,请指定您自己的模式字符串,例如,您可以对 "Jan 21 1998 01:02:03.456+08:00". 格式的时间戳使用 MMM dd yyyy HH:mm:ss.SSSVV

如果您需要使用自定义日期或时间戳格式,请确保这些格式是有效的 java.time.format.DateTimeFormatter 模式。下表显示了日期和时间戳列的自定义格式的其他示例:

类型 输入值 格式 备注
DATE 2011-3-31 默认情况下,模板可以解析此格式。您无需指定 dateFormat 参数。
DATE 2011-3-31 00:00:00 默认情况下,模板可以解析此格式。您无需指定格式。如果需要,您可以使用 yyyy-M-d' 00:00:00'
DATE 01 Apr, 18 dd MMM, yy
DATE Wednesday, April 3, 2019 AD EEEE, LLLL d, yyyy G
TIMESTAMP 2019-01-02T11:22:33Z
2019-01-02T11:22:33.123Z
2019-01-02T11:22:33.12356789Z
默认格式 ISO_INSTANT 可以解析此类型的时间戳。您无需提供 timestampFormat 参数。
TIMESTAMP 1568402363 默认情况下,模板可以解析此类型的时间戳并将其视为 Unix 时间。
TIMESTAMP Tue, 3 Jun 2008 11:05:30 GMT EEE, d MMM yyyy HH:mm:ss VV
TIMESTAMP 2018/12/31 110530.123PST yyyy/MM/dd HHmmss.SSSz
TIMESTAMP 2019-01-02T11:22:33Z 或 2019-01-02T11:22:33.123Z yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 如果输入列为 2019-01-02T11:22:33Z 和 2019-01-02T11:22:33.123Z 的混合格式,则默认格式可以解析此类型的时间戳。您无需提供自己的格式参数。您可以使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 来处理这两种情况。您不能使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]'Z',因为后缀“Z”必须解析为时区 ID(而不是字符字面量)。在内部,时间戳列会转换为 java.time.Instant。因此,您必须以 UTC 指定时间戳列,或者将时区信息与其相关联。本地日期时间(例如 2019-01-02 11:22:33)无法解析为有效的 java.time.Instant

在 Cloud Storage to Cloud Spanner 模板上运行文本文件

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Cloud Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

使用 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner
gcloud dataflow jobs run JOB_NAME \
    --gcs-location='gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner' \
    --region=DATAFLOW_REGION \
    --parameters='instanceId=INSTANCE_ID,databaseId=DATABASE_ID,importManifest=GCS_PATH_TO_IMPORT_MANIFEST'

请替换以下内容:

  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_PATH_TO_IMPORT_MANIFEST:导入清单文件的 Cloud Storage 路径
  • JOB_NAME:您选择的作业名称

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner

此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。请根据使用 REST API 中的说明使用以下示例请求。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/DATAFLOW_REGION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner

{
   "jobName": "JOB_NAME",
   "parameters": {
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "importManifest": "GCS_PATH_TO_IMPORT_MANIFEST"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • DATAFLOW_REGION:您希望运行 Dataflow 作业的区域(例如 us-central1
  • INSTANCE_ID:您的 Cloud Spanner 实例 ID
  • DATABASE_ID:您的 Cloud Spanner 数据库 ID
  • GCS_PATH_TO_IMPORT_MANIFEST:导入清单文件的 Cloud Storage 路径
  • JOB_NAME:您选择的作业名称

Java Database Connectivity (JDBC) to BigQuery

JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系数据库。您可以使用此模板将数据从任何具有可用 JDBC 驱动程序的关系数据库复制到 BigQuery 中。为了增加一项保护措施,您还可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 关系数据库的 JDBC 驱动程序必须可用。
  • 在运行此流水线之前,BigQuery 表必须已存在。
  • BigQuery 表必须具有兼容的架构。
  • 必须能够从 Dataflow 运行的子网访问关系数据库。

模板参数

参数 说明
driverJars 以逗号分隔的驱动程序 JAR 文件列表。例如 gs://<my-bucket>/driver_jar1.jar,gs://<my-bucket>/driver_jar2.jar
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionURL JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。
query 要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table
outputTable BigQuery 输出表位置,采用 <my-project>:<my-dataset>.<my-table> 格式。
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://<my-bucket>/my-files/temp_dir
connectionProperties (可选)用于 JDBC 连接的属性字符串。例如 unicode=true&characterEncoding=UTF-8
username (可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
password (可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
KMSEncryptionKey (可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。

运行 JDBC to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the JDBC to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Jdbc_to_BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Jdbc_to_BigQuery \
    --parameters \
driverJars=DRIVER_PATHS,\
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:驱动器类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • DATASET:您的 BigQuery 数据集,并替换 TABLE_NAME:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Jdbc_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Jdbc_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverJars": "DRIVER_PATHS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:驱动器类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • SOURCE_SQL_QUERY:需要在源数据库上运行的 SQL 查询
  • DATASET:您的 BigQuery 数据集,并替换 TABLE_NAME:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

Apache Cassandra to Cloud Bigtable

“从 Apache Cassandra 到 Cloud Bigtable”模板会将一个表从 Apache Cassandra 复制到 Cloud Bigtable。此模板需要最低限度的配置,可在 Cloud Bigtable 中尽可能准确地复制 Cassandra 中的表结构。

“从 Apache Cassandra 到 Cloud Bigtable”模板适用于以下情况:

  • 在可以接受短时间停机的情况下,迁移 Apache Cassandra 数据库。
  • 定期将 Cassandra 表复制到 Cloud Bigtable 以便向全球用户传送数据。

对此流水线的要求

  • 在运行此流水线之前,目标 Bigtable 表必须已存在。
  • Dataflow 工作器与 Apache Cassandra 节点之间建立了网络连接。

类型转换

“从 Apache Cassandra 到 Cloud Bigtable”模板会自动将 Apache Cassandra 数据类型转换为 Cloud Bigtable 的数据类型。

大多数原语在 Cloud Bigtable 和 Apache Cassandra 中的表示方式相同;但以下原语的表示方式有所不同。

  • DateTimestamp 会转化为 DateTime 对象
  • UUID 被转换为 String
  • Varint 被转换为 BigDecimal

Apache Cassandra 还原生支持更复杂的类型,例如 TupleListSetMap。此流水线不支持元组,因为 Apache Beam 中没有相应的类型。

例如,在 Apache Cassandra 中,您有一个 List 类型的列,名为“mylist”,还有一些类似下表内容的值。

row mylist
1 (a,b,c)

流水线会直接将此 List 列扩展为三个不同的列(在 Cloud Bigtable 中称为列限定符)。列名称为“mylist”,但流水线还会附加上列表中各项的索引编号,例如“mylist[0]”。

row mylist[0] mylist[1] mylist[2]
1 a b c

流水线按照处理列表的方式来处理集合,但添加了后缀来指明单元是键还是值。

row mymap
1 {"first_key":"first_value","another_key":"different_value"}

转换后,表如下所示:

row mymap[0].key mymap[0].value mymap[1].key mymap[1].value
1 first_key first_value another_key different_value

主键转换

在 Apache Cassandra 中,主键是使用数据定义语言定义的。主键可以是简单的、复合的,或者是与聚簇列复合的。Cloud Bigtable 支持手动行键构造,按字典顺序对字节数组进行排序。此流水线会收集有关键类型的信息,并遵循基于多个值构建行键的最佳做法构建键。

模板参数

参数 说明
cassandraHosts 以英文逗号分隔的列表中的 Apache Cassandra 节点的主机。
cassandraPort (可选)节点上用于访问 Apache Cassandra 的 TCP 端口(默认为 9042)。
cassandraKeyspace 表格所在的 Apache Cassandra 键空间。
cassandraTable 要复制的 Apache Cassandra 表格。
bigtableProjectId 从中复制 Apache Cassandra 表的 Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 要复制 Apache Cassandra 表格的 Bigtable 实例 ID。
bigtableTableId 要复制 Apache Cassandra 表格的 Bigtable 表格的名称。
defaultColumnFamily (可选)Bigtable 表格的列族名称(默认为 default)。
rowKeySeparator (可选)用于构建行键的分隔符(默认为 #)。

运行 Apache Cassandra to Cloud Bigtable 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cassandra to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cassandra_To_Cloud_Bigtable \
    --parameters\
bigtableProjectId=PROJECT_ID,\
bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableTableId=BIGTABLE_TABLE_ID,\
cassandraHosts=CASSANDRA_HOSTS,\
cassandraKeyspace=CASSANDRA_KEYSPACE,\
cassandraTable=CASSANDRA_TABLE

请替换以下内容:

  • PROJECT_ID:Cloud Bigtable 所在的项目 ID
  • JOB_NAME:您选择的作业名称
  • BIGTABLE_INSTANCE_ID:Cloud Bigtable 实例 ID
  • BIGTABLE_TABLE_ID:您的 Cloud Bigtable 表名称
  • CASSANDRA_HOSTS:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号
  • CASSANDRA_KEYSPACE:表格所在的 Apache Cassandra 键空间
  • CASSANDRA_TABLE:需要迁移的 Apache Cassandra 表

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cassandra_To_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "PROJECT_ID",
       "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
       "bigtableTableId": "BIGTABLE_TABLE_ID",
       "cassandraHosts": "CASSANDRA_HOSTS",
       "cassandraKeyspace": "CASSANDRA_KEYSPACE",
       "cassandraTable": "CASSANDRA_TABLE"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:Cloud Bigtable 所在的项目 ID
  • JOB_NAME:您选择的作业名称
  • BIGTABLE_INSTANCE_ID:Cloud Bigtable 实例 ID
  • BIGTABLE_TABLE_ID:您的 Cloud Bigtable 表名称
  • CASSANDRA_HOSTS:Apache Cassandra 主机列表;如果提供了多个主机,请按照此说明了解如何转义逗号
  • CASSANDRA_KEYSPACE:表格所在的 Apache Cassandra 键空间
  • CASSANDRA_TABLE:需要迁移的 Apache Cassandra 表

Apache Hive to BigQuery

Apache Hive to BigQuery 模板是一种批处理流水线,可从 Apache Hive 表中读取该表,然后将其写入 BigQuery 表中。

对此流水线的要求

  • 在运行此流水线之前,目标 BigQuery 表必须已存在。
  • Dataflow 工作器与 Apache Hive 节点之间必须存在网络连接。
  • Dataflow 工作器与 Apache Thrift 服务器节点之间必须存在网络连接。
  • 在运行此流水线之前,BigQuery 数据集必须已存在。

模板参数

参数 说明
metastoreUri Apache Thrift 服务器 URI,例如 thrift://thrift-server-host:port
hiveDatabaseName 包含要导出的表的 Apache Hive 数据库名称。
hiveTableName 您要导出的 Apache Hive 表名称。
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
hivePartitionCols (可选)Apache Hive 分区列的英文逗号分隔列表。
filterString (可选)输入 Apache Hive 表的过滤条件字符串。
partitionType (可选)BigQuery 中的分区类型。目前只支持 Time
partitionCol (可选)输出 BigQuery 表中的分区列名称。

运行 Apache Hive to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Hive to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Hive_To_BigQuery
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Hive_To_BigQuery \
    --parameters\
metastoreUri=METASTORE_URI,\
hiveDatabaseName=HIVE_DATABASE_NAME,\
hiveTableName=HIVE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
hivePartitionCols=HIVE_PARTITION_COLS,\
filterString=FILTER_STRING,\
partitionType=PARTITION_TYPE,\
partitionCol=PARTITION_COL

请替换以下内容:

  • PROJECT_ID:BigQuery 所在的项目 ID
  • JOB_NAME:您选择的作业名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
  • METASTORE_URI:Apache Thrift 服务器 URI
  • HIVE_DATABASE_NAME:包含您要导出的表的 Apache Hive 数据库名称
  • HIVE_TABLE_NAME:您要导出的 Apache Hive 表名称
  • HIVE_PARTITION_COLS:以英文逗号分隔的 Apache Hive 分区列列表
  • FILTER_STRING:Apache Hive 输入表的过滤条件字符串
  • PARTITION_TYPE:BigQuery 中的分区类型
  • PARTITION_COL:BigQuery 分区列的名称

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Hive_To_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Hive_To_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "metastoreUri": "METASTORE_URI",
       "hiveDatabaseName": "HIVE_DATABASE_NAME",
       "hiveTableName": "HIVE_TABLE_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME",
       "hivePartitionCols": "HIVE_PARTITION_COLS",
       "filterString": "FILTER_STRING",
       "partitionType": "PARTITION_TYPE",
       "partitionCol": "PARTITION_COL"
   },
   "environment": { "zone": "us-central1-f" }
}

请替换以下内容:

  • PROJECT_ID:BigQuery 所在的项目 ID
  • JOB_NAME:您选择的作业名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
  • METASTORE_URI:Apache Thrift 服务器 URI
  • HIVE_DATABASE_NAME:包含您要导出的表的 Apache Hive 数据库名称
  • HIVE_TABLE_NAME:您要导出的 Apache Hive 表名称
  • HIVE_PARTITION_COLS:以英文逗号分隔的 Apache Hive 分区列列表
  • FILTER_STRING:Apache Hive 输入表的过滤条件字符串
  • PARTITION_TYPE:BigQuery 中的分区类型
  • PARTITION_COL:BigQuery 分区列的名称