Google 提供的批处理模板

Google 提供了一组开源 Dataflow 模板。有关模板的一般信息,请参阅概览页面。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板页面。

本页面介绍了批处理模板:

BigQuery to Cloud Storage TFRecords

BigQuery to Cloud Storage TFRecords 模板是一种流水线,可从 BigQuery 查询读取数据并以 TFRecord 格式将其写入 Cloud Storage 存储分区。您可以指定训练、测试和验证拆分百分比。默认情况下,训练集的拆分比例为 1 或 100%,测试和验证集的拆分比例为 0 或 0%。请注意,设置数据集拆分比例时,训练、测试和验证之和加起来必须为 1 或 100%(例如,0.6 + 0.2 + 0.2)。Dataflow 会自动确定每个输出数据集的最佳分片数。

对此流水线的要求

  • BigQuery 数据集和表必须已存在。
  • 在执行此流水线之前,输出 Cloud Storage 存储分区必须已存在。请注意,训练、测试和验证子目录不需要预先存在,将会自动生成。

模板参数

参数 说明
readQuery 用于从来源中提取数据的 BigQuery SQL 查询。例如 select * from dataset1.sample_table
outputDirectory 在其中写入训练、测试和验证 TFRecord 文件的顶级 Cloud Storage 路径前缀。例如 gs://mybucket/output。生成的训练、测试和验证 TFRecord 文件的子目录将根据 outputDirectory 自动生成。例如 gs://mybucket/output/train
trainingPercentage [可选] 分配给训练 TFRecord 文件的查询数据所占的百分比。默认值为 1 或 100%。
testingPercentage [可选] 分配给测试 TFRecord 文件的查询数据所占的百分比。默认值为 0 或 0%。
validationPercentage [可选] 分配给验证 TFRecord 文件的查询数据所占的百分比。默认值为 0 或 0%。
outputSuffix [可选] 写入的训练、测试和验证 TFRecord 文件的文件后缀。默认值为 .tfrecord

执行 BigQuery to Cloud Storage TFRecord files 模板

CONSOLE

通过 Google Cloud Console 执行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具执行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records

如需发出 HTTP 请求,请替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [READ_QUERY] 替换为要执行的 BigQuery 查询。
  • [OUTPUT_DIRECTORY] 替换为输出数据集的 Cloud Storage 路径前缀。
  • [TRAINING_PERCENTAGE] 替换为训练数据集的拆分小数百分比。
  • [TESTING_PERCENTAGE] 替换为测试数据集的拆分小数百分比。
  • [VALIDATION_PERCENTAGE] 替换为验证数据集的拆分小数百分比。
  • [OUTPUT_SUFFIX] 替换为首选输出 TensorFlow 记录文件后缀。
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/Cloud_BigQuery_to_GCS_TensorFlow_Records \
    --parameters readQuery=[READ_QUERY],outputDirectory=[OUTPUT_DIRECTORY],trainingPercentage=[TRAINING_PERCENTAGE],testingPercentage=[TESTING_PERCENTAGE],validationPercentage=[VALIDATION_PERCENTAGE],outputSuffix=[FILENAME_SUFFIX]

API

通过 REST API 执行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records

如需使用 REST API 运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

如需发出 HTTP 请求,请替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [READ_QUERY] 替换为要执行的 BigQuery 查询。
  • [OUTPUT_DIRECTORY] 替换为输出数据集的 Cloud Storage 路径前缀。
  • [TRAINING_PERCENTAGE] 替换为训练数据集的拆分小数百分比。
  • [TESTING_PERCENTAGE] 替换为测试数据集的拆分小数百分比。
  • [VALIDATION_PERCENTAGE] 替换为验证数据集的拆分小数百分比。
  • [OUTPUT_SUFFIX] 替换为首选输出 TensorFlow 记录文件后缀。
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_BigQuery_to_GCS_TensorFlow_Records
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "readQuery":"[READ_QUERY]",
       "outputDirectory":"[OUTPUT_DIRECTORY]",
       "trainingPercentage":"[TRAINING_PERCENTAGE]",
       "testingPercentage":"[TESTING_PERCENTAGE]",
       "validationPercentage":"[VALIDATION_PERCENTAGE]",
       "outputSuffix":"[FILENAME_SUFFIX]"
   },
   "environment": { "zone": "us-central1-f" }
}

BigQuery to Cloud Storage Parquet (Storage API)

BigQuery to Parquet 模板是一种批处理流水线,可从 BigQuery 表读取数据并以 Parquet 格式将其写入 Cloud Storage 存储分区。此模板利用 BigQuery Storage API 导出数据。

对此流水线的要求

  • 在运行此流水线之前,输入 BigQuery 表必须已存在。
  • 在运行流水线之前,Cloud Storage 输出存储分区必须已存在。

模板参数

参数 说明
tableRef BigQuery 输入表位置。例如 <my-project>:<my-dataset>.<my-table>
bucket 要在其中写入 Parquet 文件的 Cloud Storage 文件夹。例如 gs://mybucket/exports
numShards [可选] 输出文件分片数。默认值 1。
fields [可选] 要从输入 BigQuery 表格选择的以英文逗号分隔的字段列表。

运行 BigQuery to Cloud Storage Parquet 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the BigQuery to Parquet template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行 Flex 模板,必须具有 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/BigQuery_To_Parquet

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • BIGQUERY_TABLE 替换为 BigQuery 表名。
  • OUTPUT_DIRECTORY 替换为输出文件的 Cloud Storage 文件夹。
  • NUM_SHARDS 替换为所需的输出文件分片数。
  • FIELDS 替换为要从输入的 BigQuery 表格选择的以英文逗号分隔的字段列表。
  • LOCATION 替换为执行地区。例如 us-central1
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID} \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/BigQuery_To_Parquet \
    --parameters \
tableRef=BIGQUERY_TABLE,\
bucket=OUTPUT_DIRECTORY,\
numShards=NUM_SHARDS,\
fields=FIELDS

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/BigQuery_To_Parquet

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • BIGQUERY_TABLE 替换为 BigQuery 表名。
  • OUTPUT_DIRECTORY 替换为输出文件的 Cloud Storage 文件夹。
  • NUM_SHARDS 替换为所需的输出文件分片数。
  • FIELDS 替换为要从输入的 BigQuery 表格选择的以英文逗号分隔的字段列表。
  • LOCATION 替换为执行地区。例如 us-central1
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "tableRef": "BIGQUERY_TABLE",
          "bucket": "OUTPUT_DIRECTORY",
          "numShards": "NUM_SHARDS",
          "fields": "FIELDS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/BigQuery_To_Parquet",
   }
}

Cloud Bigtable to Cloud Storage Avro

Cloud Bigtable to Cloud Storage Avro 模板是一种流水线,可从 Cloud Bigtable 表中读取数据,然后将其以 Avro 格式写入 Cloud Storage 存储分区中。您可以使用此模板将数据从 Cloud Bigtable 转移到 Cloud Storage。

对此流水线的要求

  • Cloud Bigtable 表必须存在。
  • 在运行流水线之前,Cloud Storage 输出存储分区必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导出的 Cloud Bigtable 表的 ID。
outputDirectory 数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Avro 文件名的前缀。例如 output-

运行 Cloud Bigtable to Cloud Storage Avro file 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Bigtable to Avro template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [OUTPUT_DIRECTORY] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 Avro 文件名的前缀。例如 output-
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Avro \
    --parameters bigtableProjectId=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],outputDirectory=[OUTPUT_DIRECTORY],filenamePrefix=[FILENAME_PREFIX]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Avro

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [OUTPUT_DIRECTORY] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 Avro 文件名的前缀。例如 output-
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Avro
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProjectId": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "outputDirectory": "[OUTPUT_DIRECTORY]",
       "filenamePrefix": "[FILENAME_PREFIX]",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Bigtable to Cloud Storage Parquet

Cloud Bigtable to Cloud Storage Parquet 模板是一种流水线,可从 Cloud Bigtable 表中读取数据,然后将其以 Parquet 格式写入 Cloud Storage 存储分区中。您可以使用此模板将数据从 Cloud Bigtable 转移到 Cloud Storage。

对此流水线的要求

  • Cloud Bigtable 表必须存在。
  • 在运行流水线之前,Cloud Storage 输出存储分区必须已存在。

模板参数

参数 说明
bigtableProjectId 您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导出的 Cloud Bigtable 表的 ID。
outputDirectory 数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix Parquet 文件名的前缀。例如 output-
numShards 要输出的 Parquet 文件数。例如 2

运行 Cloud Bigtable to Cloud Storage Parquet file 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Bigtable to Parquet template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [OUTPUT_DIRECTORY] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 Parquet 文件名的前缀。例如 output-
  • [NUM_SHARDS] 替换为要输出的 Parquet 文件数。例如 1
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Parquet \
    --parameters bigtableProjectId=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],outputDirectory=[OUTPUT_DIRECTORY],filenamePrefix=[FILENAME_PREFIX],numShards=[NUM_SHARDS]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_Parquet

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [OUTPUT_DIRECTORY] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 Parquet 文件名的前缀。例如 output-
  • [NUM_SHARDS] 替换为要输出的 Parquet 文件数。例如 1
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProjectId": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "outputDirectory": "[OUTPUT_DIRECTORY]",
       "filenamePrefix": "[FILENAME_PREFIX]",
       "numShards": "[NUM_SHARDS]"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Bigtable to Cloud Storage SequenceFile

Cloud Bigtable to Cloud Storage SequenceFile 模板是一种流水线,可从 Cloud Bigtable 表中读取数据,然后将其以 SequenceFile 格式写入 Cloud Storage 存储分区中。您可以使用此模板将数据从 Cloud Bigtable 复制到 Cloud Storage。

对此流水线的要求

  • Cloud Bigtable 表必须存在。
  • 在运行流水线之前,Cloud Storage 输出存储分区必须已存在。

模板参数

参数 说明
bigtableProject 您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导出的 Cloud Bigtable 表的 ID。
bigtableAppProfileId 将用于导出的 Cloud Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Cloud Bigtable 将使用该实例的默认应用配置文件
destinationPath 数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
filenamePrefix SequenceFile 文件名的前缀。例如 output-

运行 Cloud Bigtable to Cloud Storage SequenceFile 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Bigtable to SequenceFile template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [APPLICATION_PROFILE_ID] 替换为用于导出的 Cloud Bigtable 应用配置文件的 ID。
  • [DESTINATION_PATH] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 SequenceFile 文件名的前缀。例如 output-
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_SequenceFile \
    --parameters bigtableProject=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],bigtableAppProfileId=[APPLICATION_PROFILE_ID],destinationPath=[DESTINATION_PATH],filenamePrefix=[FILENAME_PREFIX]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Bigtable_to_GCS_SequenceFile

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [APPLICATION_PROFILE_ID] 替换为用于导出的 Cloud Bigtable 应用配置文件的 ID。
  • [DESTINATION_PATH] 替换为数据写入位置的 Cloud Storage 路径。例如 gs://mybucket/somefolder
  • [FILENAME_PREFIX] 替换为 SequenceFile 文件名的前缀。例如 output-
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_Bigtable_to_GCS_SequenceFile
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProject": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "bigtableAppProfileId": "[APPLICATION_PROFILE_ID]",
       "destinationPath": "[DESTINATION_PATH]",
       "filenamePrefix": "[FILENAME_PREFIX]",
   },
   "environment": { "zone": "us-central1-f" }
}

Datastore to Cloud Storage Text

Datastore to Cloud Storage Text 模板是一种批处理流水线,可读取 Datastore 实体并以文本文件形式将其写入 Cloud Storage。您可以提供一个函数以将每个实体处理为 JSON 字符串。如果您未提供此类函数,则输出文件中的每一行都将是一个 JSON 序列化实体。

对此流水线的要求

在运行此流水线之前,必须先在项目中设置 Datastore。

模板参数

参数 说明
datastoreReadGqlQuery 一种 GQL 查询,用于指定要获取的实体。例如 SELECT * FROM MyKind
datastoreReadProjectId 您要从中读取数据的 Datastore 实例的 Google Cloud 项目 ID。
datastoreReadNamespace 所请求实体的命名空间。如需使用默认命名空间,请将此参数留空。
javascriptTextTransformGcsPath 包含所有 JavaScript 代码的 Cloud Storage 路径。例如 gs://mybucket/mytransforms/*.js。如果您不想提供函数,请将此参数留空。
javascriptTextTransformFunctionName 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform。如果您不想提供函数,请将此参数留空。
textWritePrefix Cloud Storage 路径前缀,用于指定应将数据写入的位置。例如 gs://mybucket/somefolder/

运行 Datastore to Cloud Storage Text 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Datastore to Cloud Storage Text template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Datastore_to_GCS_Text

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • YOUR_DATASTORE_KIND 替换为 Datastore 实体的类型。
  • YOUR_DATASTORE_NAMESPACE 替换为 Datastore 实体的命名空间。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Datastore_to_GCS_Text \
    --parameters \
datastoreReadGqlQuery="SELECT * FROM YOUR_DATASTORE_KIND",\
datastoreReadProjectId=YOUR_PROJECT_ID,\
datastoreReadNamespace=YOUR_DATASTORE_NAMESPACE,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
textWritePrefix=gs://YOUR_BUCKET_NAME/output/

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Datastore_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • YOUR_DATASTORE_KIND 替换为 Datastore 实体的类型。
  • YOUR_DATASTORE_NAMESPACE 替换为 Datastore 实体的命名空间。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Datastore_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "SELECT * FROM YOUR_DATASTORE_KIND"
       "datastoreReadProjectId": "YOUR_PROJECT_ID",
       "datastoreReadNamespace": "YOUR_DATASTORE_NAMESPACE",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "textWritePrefix": "gs://YOUR_BUCKET_NAME/output/"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Spanner to Cloud Storage Avro

Cloud Spanner to Cloud Storage 模板是一种批处理流水线,可将整个 Cloud Spanner 数据库以 Avro 格式导出到 Cloud Storage。导出 Cloud Spanner 数据库会在您选择的存储分区中创建一个文件夹。该文件夹包含以下内容:

  • spanner-export.json 文件。
  • 您导出的数据库中每个表的 TableName-manifest.json 文件。
  • 一个或多个 TableName.avro-#####-of-##### 文件。

例如,如果导出包含两个表 SingersAlbums 的数据库,则系统会创建以下文件集:

  • Albums-manifest.json
  • Albums.avro-00000-of-00002
  • Albums.avro-00001-of-00002
  • Singers-manifest.json
  • Singers.avro-00000-of-00003
  • Singers.avro-00001-of-00003
  • Singers.avro-00002-of-00003
  • spanner-export.json

对此流水线的要求

  • Cloud Spanner 数据库必须存在。
  • Cloud Storage 输出存储分区必须存在。
  • 除了运行 Dataflow 作业所需的 IAM 角色之外,您还必须具有适当的 IAM 角色才能读取 Cloud Spanner 数据并写入 Cloud Storage 存储分区。

模板参数

参数 说明
instanceId 需要导出的 Cloud Spanner 数据库的实例 ID。
databaseId 需要导出的 Cloud Spanner 数据库的数据库 ID。
outputDir 您期望的 Avro 文件导出位置的 Cloud Storage 路径。导出作业会在此路径下创建一个包含导出文件的新目录。

运行模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Spanner to Cloud Storage Avro template。
  6. 作业名称字段中输入作业名称。
    • 作业名称必须与 cloud-spanner-export-[YOUR_INSTANCE_ID]-[YOUR_DATABASE_ID] 格式匹配才会在 Cloud Console 的 Cloud Spanner 部分显示。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。必须如下所示替换此示例中的值:

  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [YOUR_INSTANCE_ID] 替换为 Cloud Spanner 实例 ID。
  • [YOUR_DATABASE_ID] 替换为 Cloud Spanner 数据库 ID。
  • [YOUR_GCS_DIRECTORY] 替换为 Avro 文件应导出到的 Cloud Storage 路径。
  • [JOB_NAME] 替换为您选择的作业名称。
    • 作业名称必须与 cloud-spanner-export-[YOUR_INSTANCE_ID]-[YOUR_DATABASE_ID] 格式匹配才会在 Cloud Console 的 Cloud Spanner 部分显示。
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location='gs://dataflow-templates/[VERSION]/Cloud_Spanner_to_GCS_Avro' \
    --region=[DATAFLOW_REGION] \
    --parameters='instanceId=[YOUR_INSTANCE_ID],databaseId=[YOUR_DATABASE_ID],outputDir=[YOUR_GCS_DIRECTORY]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_Spanner_to_GCS_Avro

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。必须如下所示替换此示例中的值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [YOUR_INSTANCE_ID] 替换为 Cloud Spanner 实例 ID。
  • [YOUR_DATABASE_ID] 替换为 Cloud Spanner 数据库 ID。
  • [YOUR_GCS_DIRECTORY] 替换为 Avro 文件应导出到的 Cloud Storage 路径。
  • [JOB_NAME] 替换为您选择的作业名称。
    • 作业名称必须与 cloud-spanner-export-[YOUR_INSTANCE_ID]-[YOUR_DATABASE_ID] 格式匹配才会在 Cloud Console 的 Cloud Spanner 部分显示。
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/locations/[DATAFLOW_REGION]/templates:launch?gcsPath=gs://dataflow-templates/[VERSION]/Cloud_Spanner_to_GCS_Avro
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "instanceId": "[YOUR_INSTANCE_ID]",
       "databaseId": "[YOUR_DATABASE_ID]",
       "outputDir": "gs://[YOUR_GCS_DIRECTORY]"
   }
}

Cloud Spanner to Cloud Storage Text

Cloud Spanner to Cloud Storage Text 模板是一种批处理流水线,可从 Cloud Spanner 表中读取数据,视需要通过您提供的 JavaScript 用户定义函数 (UDF) 转换数据,然后以 CSV 文本文件形式将数据写入 Cloud Storage。

对此流水线的要求

  • 在运行此流水线之前,输入 Cloud Storage 表必须已存在。

模板参数

参数 说明
spannerProjectId 需要从中读取数据的 Cloud Spanner 数据库的 GCP 项目 ID。
spannerDatabaseId 所请求的表的数据库。
spannerInstanceId 所请求的表的实例。
spannerTable 要导出的表。
textWritePrefix 写入输出文本文件的输出目录。请在末尾添加 /。例如 gs://mybucket/somefolder/
javascriptTextTransformGcsPath [可选] 包含所有 JavaScript 代码的 Cloud Storage 路径。例如 gs://mybucket/mytransforms/*.js。如果您不想提供函数,请将此参数留空。
javascriptTextTransformFunctionName [可选] 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform。如果您不想提供函数,请将此参数留空。

运行 Cloud Spanner to Cloud Storage Text 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Spanner to Cloud Storage Text template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Spanner_to_GCS_Text

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_DATABASE_ID 替换为 Spanner 数据库 ID。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • YOUR_INSTANCE_ID 替换为 Spanner 实例 ID。
  • YOUR_TABLE_ID 替换为 Spanner 表 ID。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Spanner_to_GCS_Text \
    --parameters \
spannerProjectId=YOUR_PROJECT_ID,\
spannerDatabaseId=YOUR_DATABASE_ID,\
spannerInstanceId=YOUR_INSTANCE_ID,\
spannerTable=YOUR_TABLE_ID,\
textWritePrefix=gs://YOUR_BUCKET_NAME/output/,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Spanner_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_DATABASE_ID 替换为 Spanner 数据库 ID。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • YOUR_INSTANCE_ID 替换为 Spanner 实例 ID。
  • YOUR_TABLE_ID 替换为 Spanner 表 ID。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Spanner_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "parameters": {
       "spannerProjectId": "YOUR_PROJECT_ID",
       "spannerDatabaseId": "YOUR_DATABASE_ID",
       "spannerInstanceId": "YOUR_INSTANCE_ID",
       "spannerTable": "YOUR_TABLE_ID",
       "textWritePrefix": "gs://YOUR_BUCKET_NAME/output/",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Avro to Cloud Bigtable

Cloud Storage Avro to Cloud Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区的 Avro 文件中读取数据并将它们写入 Cloud Bigtable 表中。您可以使用此模板将数据从 Cloud Storage 复制到 Cloud Bigtable。

对此流水线的要求

  • Cloud Bigtable 表必须存在,并且列族必须与 Avro 文件中导出的列族相同。
  • 在运行流水线之前,Avro 输入文件必须已存在于 Cloud Storage 存储分区中。
  • Cloud Bigtable 需要 Avro 输入文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要写入数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导入的 Cloud Bigtable 表的 ID。
inputFilePattern 数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Avro file to Cloud Bigtable 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Spanner to Cloud Storage Text template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [INPUT_FILE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable \
    --parameters bigtableProjectId=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],inputFilePattern=[INPUT_FILE_PATTERN]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [INPUT_FILE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Bigtable
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProjectId": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "inputFilePattern": "[INPUT_FILE_PATTERN]",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Avro to Cloud Spanner

Cloud Storage Avro files to Cloud Spanner 模板是一种批处理流水线,会读取从 Cloud Storage 中存储的 Cloud Spanner 导出的 Avro 文件,并将其导入 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库必须已存在且必须为空。
  • 您必须拥有 Cloud Storage 存储分区的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • Cloud Storage 输入路径必须存在,并且必须包含 spanner-export.json 文件,且该文件包含要导入的文件的 JSON 描述。

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的数据库 ID。
inputDir 应导入的 Avro 文件的源位置的 Cloud Storage 路径。

运行 Cloud Storage Avro to Cloud Spanner 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Avro to Spanner template。
  6. 作业名称字段中输入作业名称。
    • 作业名称必须与 cloud-spanner-import-[YOUR_INSTANCE_ID]-[YOUR_DATABASE_ID] 格式匹配才会在 Cloud Console 的 Cloud Spanner 部分显示。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • (仅限 API)将 [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [YOUR_INSTANCE_ID] 替换为包含数据库的 Spanner 实例的 ID。
  • [YOUR_DATABASE_ID] 替换为要导入其中的 Spanner 数据库的 ID。
  • (仅限 gcloud)将 [YOUR_GCS_STAGING_LOCATION] 替换为写入临时文件的路径。例如 gs://mybucket/temp
  • [YOUR_GCS_DIRECTORY] 替换为应从中导入 Avro 文件的 Cloud Storage 路径。例如 gs://mybucket/somefolder
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location='gs://dataflow-templates/[VERSION]/GCS_Avro_to_Cloud_Spanner' \
    --region=[DATAFLOW_REGION] \
    --staging-location=[YOUR_GCS_STAGING_LOCATION] \
    --parameters='instanceId=[YOUR_INSTANCE_ID],databaseId=[YOUR_DATABASE_ID],inputDir=[YOUR_GCS_DIRECTORY]'

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Avro_to_Cloud_Spanner

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • (仅限 API)将 [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [YOUR_INSTANCE_ID] 替换为包含数据库的 Spanner 实例的 ID。
  • [YOUR_DATABASE_ID] 替换为要导入其中的 Spanner 数据库的 ID。
  • (仅限 gcloud)将 [YOUR_GCS_STAGING_LOCATION] 替换为写入临时文件的路径。例如 gs://mybucket/temp
  • [YOUR_GCS_DIRECTORY] 替换为应从中导入 Avro 文件的 Cloud Storage 路径。例如 gs://mybucket/somefolder
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/locations/[DATAFLOW_REGION]/templates:launch?gcsPath=gs://dataflow-templates/[VERSION]/GCS_Avro_to_Cloud_Spanner
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "instanceId": "[YOUR_INSTANCE_ID]",
       "databaseId": "[YOUR_DATABASE_ID]",
       "inputDir": "gs://[YOUR_GCS_DIRECTORY]"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

Cloud Storage Parquet to Cloud Bigtable

Cloud Storage Parquet to Cloud Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区的 Parquet 文件中读取数据,然后将其写入 Cloud Bigtable 表中。您可以使用此模板将数据从 Cloud Storage 复制到 Cloud Bigtable。

对此流水线的要求

  • Cloud Bigtable 表必须已存在,并且列族必须与 Parquet 文件中导出的列族相同。
  • 在运行此流水线之前,Cloud Storage 存储分区中必须已存在输入 Parquet 文件。
  • Cloud Bigtable 需要 Parquet 输入文件具有特定架构

模板参数

参数 说明
bigtableProjectId 您要写入数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导入的 Cloud Bigtable 表的 ID。
inputFilePattern 数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage Parquet file to Cloud Bigtable 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Avro to Spanner template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [INPUT_FILE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/GCS_Parquet_to_Cloud_Bigtable \
    --parameters bigtableProjectId=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],inputFilePattern=[INPUT_FILE_PATTERN]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Parquet_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [INPUT_FILE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProjectId": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "inputFilePattern": "[INPUT_FILE_PATTERN]",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage SequenceFile to Cloud Bigtable

Cloud Storage SequenceFile to Cloud Bigtable 模板是一种流水线,可从 Cloud Storage 存储分区的 SequenceFiles 文件中读取数据并将它们写入 Cloud Bigtable 表中。您可以使用此模板将数据从 Cloud Storage 复制到 Cloud Bigtable。

对此流水线的要求

  • Cloud Bigtable 表必须存在。
  • 在运行流水线之前,SequenceFile 输入文件必须已存在于 Cloud Storage 存储分区中。
  • SequenceFile 输入文件必须是从 Cloud Bigtable 或 HBase 中导出的文件。

模板参数

参数 说明
bigtableProject 您要写入数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 包含表的 Cloud Bigtable 实例的 ID。
bigtableTableId 需要导入的 Cloud Bigtable 表的 ID。
bigtableAppProfileId 将用于导入的 Cloud Bigtable 应用配置文件的 ID。如果您没有指定应用配置文件,则 Cloud Bigtable 将使用该实例的默认应用配置文件
sourcePattern 数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*

运行 Cloud Storage SequenceFile to Cloud Bigtable 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the SequenceFile Files on Cloud Storage to Cloud Bigtable template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [APPLICATION_PROFILE_ID] 替换为用于导出的 Cloud Bigtable 应用配置文件的 ID。
  • [SOURCE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location gs://dataflow-templates/latest/GCS_SequenceFile_to_Cloud_Bigtable \
    --parameters bigtableProject=[PROJECT_ID],bigtableInstanceId=[INSTANCE_ID],bigtableTableId=[TABLE_ID],bigtableAppProfileId=[APPLICATION_PROFILE_ID],sourcePattern=[SOURCE_PATTERN]

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_SequenceFile_to_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。您必须在此示例中替换以下值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [JOB_NAME] 替换为您选择的作业名称。
  • [PROJECT_ID] 替换为您要从中读取数据的 Cloud Bigtable 实例的 Google Cloud 项目 ID。
  • [INSTANCE_ID] 替换为包含表的 Cloud Bigtable 实例的 ID。
  • [TABLE_ID] 替换为要导出的 Cloud Bigtable 表的 ID。
  • [APPLICATION_PROFILE_ID] 替换为用于导出的 Cloud Bigtable 应用配置文件的 ID。
  • [SOURCE_PATTERN] 替换为数据所在位置的 Cloud Storage 路径模式。例如 gs://mybucket/somefolder/prefix*
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_SequenceFile_to_Cloud_Bigtable
{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "bigtableProject": "[PROJECT_ID]",
       "bigtableInstanceId": "[INSTANCE_ID]",
       "bigtableTableId": "[TABLE_ID]",
       "bigtableAppProfileId": "[APPLICATION_PROFILE_ID]",
       "sourcePattern": "[SOURCE_PATTERN]",
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to BigQuery

Cloud Storage Text to BigQuery 流水线是一种批处理流水线,可用于读取 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果输出到 BigQuery。

重要提示:如果您重复使用现有 BigQuery 表,该表将会被覆盖。

对此流水线的要求

  • 创建一个用于描述 BigQuery 架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。例如:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING"
        },
        {
          "name": "coffee",
          "type": "STRING"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。请注意,此函数必须返回 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformFunctionName 要从 .js 文件中调用的函数的名称。
JSONPath 用于定义 BigQuery 架构的 JSON 文件(存储在 Cloud Storage 中)的 gs:// 路径。例如 gs://path/to/my/schema.json
javascriptTextTransformGcsPath 用于定义 UDF 的 JavaScript 文件的 gs:// 路径。例如 gs://path/to/my/javascript_function.js
inputFilePattern Cloud Storage 中待处理文本的 gs:// 路径。例如 gs://path/to/my/text/data.txt
outputTable 要创建用以存储已处理数据的 BigQuery 表名称。如果您重复使用现有 BigQuery 表,该表将会被覆盖。例如 my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://my-bucket/my-files/temp_dir

运行 Cloud Storage Text to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 UDF 名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON 替换为包含架构定义的 JSON 文件的 Cloud Storage 路径。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • PATH_TO_YOUR_TEXT_DATA 替换为文本数据集的 Cloud Storage 路径。
  • BIGQUERY_TABLE 替换为 BigQuery 表名。
  • PATH_TO_TEMP_DIR_ON_GCS 替换为临时目录的 Cloud Storage 路径。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 UDF 名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON 替换为包含架构定义的 JSON 文件的 Cloud Storage 路径。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • PATH_TO_YOUR_TEXT_DATA 替换为文本数据集的 Cloud Storage 路径。
  • BIGQUERY_TABLE 替换为 BigQuery 表名。
  • PATH_TO_TEMP_DIR_ON_GCS 替换为临时目录的 Cloud Storage 路径。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_YOUR_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Datastore

Cloud Storage Text to Datastore 模板是一种批处理流水线,可从存储在 Cloud Storage 中的文本文件读取数据,并将采用 JSON 编码的实体写入 Datastore。输入文本文件中的所有行均应采用 https://cloud.google.com/datastore/docs/reference/rest/v1/Entity 中指定的 JSON 格式。

对此流水线的要求

  • 必须在目标项目中启用 Datastore。

模板参数

参数 说明
textReadPattern 用于指定文本数据文件的位置的 Cloud Storage 文件路径模式。例如 gs://mybucket/somepath/*.json
javascriptTextTransformGcsPath 包含所有 JavaScript 代码的 Cloud Storage 路径模式。例如 gs://mybucket/mytransforms/*.js。如果您不想提供函数,请将此参数留空。
javascriptTextTransformFunctionName 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform。如果您不想提供函数,请将此参数留空。
datastoreWriteProjectId 要向其写入 Datastore 实体的位置的 Google Cloud 项目 ID
errorWritePath 要用于写入在处理期间发生的故障的错误日志输出文件。例如 gs://bucket-name/errors.txt

运行 Cloud Storage Text to Datastore 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to Datastore template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Datastore

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • PATH_TO_INPUT_TEXT_FILES 替换为 Cloud Storage 上的输入文件格式。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • ERROR_FILE_WRITE_PATH 替换为 Cloud Storage 上的所需错误文件路径。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_Datastore \
    --parameters \
textReadPattern=PATH_TO_INPUT_TEXT_FILES,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
datastoreWriteProjectId=YOUR_PROJECT_ID,\
errorWritePath=ERROR_FILE_WRITE_PATH

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Datastore

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • PATH_TO_INPUT_TEXT_FILES 替换为 Cloud Storage 上的输入文件格式。
  • YOUR_JAVASCRIPT_FUNCTION 替换为 JavaScript 函数名称。
  • PATH_TO_JAVASCRIPT_UDF_FILE 替换为包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径。
  • ERROR_FILE_WRITE_PATH 替换为 Cloud Storage 上的所需错误文件路径。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_Datastore
{
   "jobName": "JOB_NAME",
   "parameters": {
       "textReadPattern": "PATH_TO_INPUT_TEXT_FILES",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
       "datastoreWriteProjectId": "YOUR_PROJECT_ID",
       "errorWritePath": "ERROR_FILE_WRITE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Pub/Sub (Batch)

此模板会创建一种批处理流水线,该流水线可从存储在 Cloud Storage 中的文本文件读取记录,并将其发布到 Pub/Sub 主题。使用此模板,您可以将采用换行符分隔的文件中的 JSON 记录或 CSV 文件中的记录发布到 Pub/Sub 主题,以实现实时处理。您可以使用此模板将数据重放到 Pub/Sub。

请注意,此模板不会在各个记录上设置任何时间戳,因此事件时间将与执行时的发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,则不应使用此流水线。

对此流水线的要求

  • 要读取的文件必须采用换行符分隔的 JSON 或 CSV 格式。在源文件中占多行的记录可能会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

参数 说明
inputFilePattern 要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json
outputTopic 要向其写入数据的 Pub/Sub 输入主题。该名称应采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Cloud Storage Text to Pub/Sub (Batch) 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to Pub/Sub (Batch) template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_TOPIC_NAME 替换为您的 Pub/Sub 主题名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub \
    --parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/files/*.json,\
outputTopic=projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_TOPIC_NAME 替换为您的 Pub/Sub 主题名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://YOUR_BUCKET_NAME/files/*.json",
       "outputTopic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Cloud Storage Text to Cloud Spanner

Cloud Storage Text to Cloud Spanner 模板是一种批处理流水线,用于从 Cloud Storage 读取 CSV 文本文件并将其导入到 Cloud Spanner 数据库。

对此流水线的要求

  • 目标 Cloud Spanner 数据库和表必须已存在。
  • 您必须拥有 Cloud Storage 存储分区的读取权限以及目标 Cloud Spanner 数据库的写入权限。
  • 包含 CSV 文件的 Cloud Storage 输入路径必须存在。
  • 您必须创建包含 CSV 文件的 JSON 说明的导入清单文件,并且必须将该清单存储在 Cloud Storage 中。
  • 如果目标 Cloud Spanner 数据库已有架构,则清单文件中指定的任何列都必须与目标数据库架构中的相应列具有相同的数据类型。
  • 采用 ASCII 或 UTF-8 编码的清单文件必须符合以下格式:

  • 要导入的文本文件必须采用 ASCII 或 UTF-8 编码的 CSV 格式。我们建议您不要在 UTF-8 编码文件中使用字节顺序标记 (BOM)。
  • 数据必须与下面的一种类型相匹配:
    • INT64
    • FLOAT64
    • BOOL
    • STRING
    • DATE
    • TIMESTAMP

模板参数

参数 说明
instanceId Cloud Spanner 数据库的实例 ID。
databaseId Cloud Spanner 数据库的 ID。
importManifest Cloud Storage 中导入清单文件的路径。
columnDelimiter 源文件使用的列分隔符。默认值为 ,
fieldQualifier 应放置在包含 columnDelimiter 的源文件中任何值前后的字符。默认值为 "
trailingDelimiter 指定源文件中的行是否带有末尾分隔符(即,在每行末尾的最后一列值之后,是否会出现 columnDelimiter 字符)。默认值为 true
escape 源文件使用的转义字符。默认情况下,此参数未设置,并且模板不使用转义字符。
nullString 表示 NULL 值的字符串。默认情况下,此参数未设置,并且模板不使用 null 字符串。
dateFormat 用于解析日期列的格式。默认情况下,流水线会尝试将日期列解析为 yyyy-M-d[' 00:00:00'],例如,解析为 2019-01-31 或 2019-1-1 00:00:00。如果您的日期格式有所不同,请使用 java.time.format.DateTimeFormatter 模式指定格式。
timestampFormat 用于解析时间戳列的格式。如果时间戳为长整数,则会解析为 Unix 时间。否则,时间戳会解析为 java.time.format.DateTimeFormatter.ISO_INSTANT 格式的字符串。对于其他情况,请指定您自己的模式字符串,例如,您可以对 "Jan 21 1998 01:02:03.456+08:00". 格式的时间戳使用 MMM dd yyyy HH:mm:ss.SSSVV

如果您需要使用自定义日期或时间戳格式,请确保这些格式是有效的 java.time.format.DateTimeFormatter 模式。下表显示了日期和时间戳列的自定义格式的其他示例:

类型 输入值 格式 备注
DATE 2011-3-31 默认情况下,模板可以解析此格式。您无需指定 dateFormat 参数。
DATE 2011-3-31 00:00:00 默认情况下,模板可以解析此格式。您无需指定格式。如果需要,您可以使用 yyyy-M-d' 00:00:00'
DATE 01 Apr, 18 dd MMM, yy
DATE Wednesday, April 3, 2019 AD EEEE, LLLL d, yyyy G
TIMESTAMP 2019-01-02T11:22:33Z
2019-01-02T11:22:33.123Z
2019-01-02T11:22:33.12356789Z
默认格式 ISO_INSTANT 可以解析此类型的时间戳。您无需提供 timestampFormat 参数。
TIMESTAMP 1568402363 默认情况下,模板可以解析此类型的时间戳并将其视为 Unix 时间。
TIMESTAMP Tue, 3 Jun 2008 11:05:30 GMT EEE, d MMM yyyy HH:mm:ss VV
TIMESTAMP 2018/12/31 110530.123PST yyyy/MM/dd HHmmss.SSSz
TIMESTAMP 2019-01-02T11:22:33Z 或 2019-01-02T11:22:33.123Z yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 如果输入列为 2019-01-02T11:22:33Z 和 2019-01-02T11:22:33.123Z 的混合格式,则默认格式可以解析此类型的时间戳。您无需提供自己的格式参数。不过,您可以使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]VV 应对这两种情况。请注意,您不能使用 yyyy-MM-dd'T'HH:mm:ss[.SSS]'Z',因为后缀“Z”必须解析为时区 ID(而不是字符字面量)。在内部,时间戳列会转换为 java.time.Instant。因此,您必须以 UTC 指定时间戳列,或者将时区信息与其相关联。本地日期时间(例如 2019-01-02 11:22:33)无法解析为有效的 java.time.Instant

运行模板

控制台

Google Cloud Console 中运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to Cloud Spanner template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

使用 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。必须如下所示替换此示例中的值:

  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [YOUR_INSTANCE_ID] 替换为 Cloud Spanner 实例 ID。
  • [YOUR_DATABASE_ID] 替换为 Cloud Spanner 数据库 ID。
  • [GCS_PATH_TO_IMPORT_MANIFEST] 替换为导入清单文件的 Cloud Storage 路径。
  • [JOB_NAME] 替换为您选择的作业名称。
gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location='gs://dataflow-templates/[VERSION]/GCS_Text_to_Cloud_Spanner' \
    --region=[DATAFLOW_REGION] \
    --parameters='instanceId=[YOUR_INSTANCE_ID],databaseId=[YOUR_DATABASE_ID],importManifest=[GCS_PATH_TO_IMPORT_MANIFEST]'

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/GCS_Text_to_Cloud_Spanner

请根据使用 REST API 中的说明使用以下示例请求。此请求需要授权,同时您还必须指定自己具有写入权限的 tempLocation。必须如下所示替换此示例中的值:

  • [YOUR_PROJECT_ID] 替换为您的项目 ID。
  • [DATAFLOW_REGION] 替换为您要在其中运行 Dataflow 作业的区域(例如 us-central1)。
  • [YOUR_INSTANCE_ID] 替换为 Cloud Spanner 实例 ID。
  • [YOUR_DATABASE_ID] 替换为 Cloud Spanner 数据库 ID。
  • [GCS_PATH_TO_IMPORT_MANIFEST] 替换为导入清单文件的 Cloud Storage 路径。
  • [JOB_NAME] 替换为您选择的作业名称。
POST https://dataflow.googleapis.com/v1b3/projects/[YOUR_PROJECT_ID]/locations/[DATAFLOW_REGION]/templates:launch?gcsPath=gs://dataflow-templates/[VERSION]/GCS_Text_to_Cloud_Spanner

{
   "jobName": "[JOB_NAME]",
   "parameters": {
       "instanceId": "[YOUR_INSTANCE_ID]",
       "databaseId": "[YOUR_DATABASE_ID]",
       "importManifest": "[GCS_PATH_TO_IMPORT_MANIFEST]"
   },
   "environment": {
       "machineType": "n1-standard-2"
   }
}

Java Database Connectivity (JDBC) to BigQuery

JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系数据库。您可以使用此模板将数据从任何具有可用 JDBC 驱动程序的关系数据库复制到 BigQuery 中。为了增加一项保护措施,您还可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 关系数据库的 JDBC 驱动程序必须可用。
  • BigQuery 表必须存在才能执行此执行流水线。
  • BigQuery 表必须具有兼容的架构。
  • 必须能够从 Dataflow 运行的子网访问关系数据库。

模板参数

参数 说明
driverJars 以英文逗号分隔的驱动程序 jar 列表。例如 gs://<my-bucket>/driver_jar1.jar,gs://<my-bucket>/driver_jar2.jar
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionURL JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
query 要在提取数据的源上运行的查询。例如 select * from sampledb.sample_table
outputTable BigQuery 输出表位置,采用 <my-project>:<my-dataset>.<my-table> 格式。
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://<my-bucket>/my-files/temp_dir
connectionProperties [可选] 用于 JDBC 连接的属性字符串。例如 unicode=true&characterEncoding=UTF-8
username [可选] 用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
password [可选] 用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
KMSEncryptionKey [可选] 用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。

运行 JDBC to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the JDBC to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Jdbc_to_BigQuery

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • DRIVER_PATHS 替换为 JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径。
  • DRIVER_CLASS_NAME 替换为驱动器类名称。
  • JDBC_CONNECTION_URL 替换为 JDBC 连接网址。
  • SOURCE_SQL_QUERY 替换为要在源数据库上运行的 SQL 查询。
  • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名。
  • PATH_TO_TEMP_DIR_ON_GCS 替换为临时目录的 Cloud Storage 路径。
  • 根据需要将 CONNECTION_PROPERTIES 替换为 JDBC 连接属性。
  • CONNECTION_USERNAME 替换为 JDBC 连接用户名。
  • CONNECTION_PASSWORD 替换为 JDBC 连接密码。
  • KMS_ENCRYPTION_KEY 替换为 Cloud KMS 加密密钥。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Jdbc_to_BigQuery \
    --parameters \
driverJars=DRIVER_PATHS,\
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Jdbc_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • DRIVER_PATHS 替换为 JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径。
  • DRIVER_CLASS_NAME 替换为驱动器类名称。
  • JDBC_CONNECTION_URL 替换为 JDBC 连接网址。
  • SOURCE_SQL_QUERY 替换为要在源数据库上运行的 SQL 查询。
  • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名。
  • PATH_TO_TEMP_DIR_ON_GCS 替换为临时目录的 Cloud Storage 路径。
  • 根据需要将 CONNECTION_PROPERTIES 替换为 JDBC 连接属性。
  • CONNECTION_USERNAME 替换为 JDBC 连接用户名。
  • CONNECTION_PASSWORD 替换为 JDBC 连接密码。
  • KMS_ENCRYPTION_KEY 替换为 Cloud KMS 加密密钥。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Jdbc_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverJars": "DRIVER_PATHS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "query": "SOURCE_SQL_QUERY",
       "outputTable": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" }
}

Apache Cassandra to Cloud Bigtable

“从 Apache Cassandra 到 Cloud Bigtable”模板会将一个表从 Apache Cassandra 复制到 Cloud Bigtable。此模板需要最低限度的配置,可在 Cloud Bigtable 中尽可能准确地复制 Cassandra 中的表结构。

“从 Apache Cassandra 到 Cloud Bigtable”模板适用于以下情况:

  • 在可以接受短时间停机的情况下,迁移 Apache Cassandra 数据库。
  • 定期将 Cassandra 表复制到 Cloud Bigtable 以便向全球用户传送数据。

对此流水线的要求

  • 在运行流水线之前,必须已存在目标 Bigtable 表。
  • Dataflow 工作器与 Apache Cassandra 节点之间建立了网络连接。

类型转换

“从 Apache Cassandra 到 Cloud Bigtable”模板会自动将 Apache Cassandra 数据类型转换为 Cloud Bigtable 的数据类型。

大多数原语在 Cloud Bigtable 和 Apache Cassandra 中的表示方式相同;但以下原语的表示方式有所不同。

  • DateTimestamp 会转化为 DateTime 对象
  • UUID 被转换为 String
  • Varint 被转换为 BigDecimal

Apache Cassandra 还原生支持更复杂的类型,例如 TupleListSetMap。此流水线不支持元组,因为 Apache Beam 中没有相应的类型。

例如,在 Apache Cassandra 中,您有一个 List 类型的列,名为“mylist”,还有一些类似下表内容的值。

row mylist
1 (a,b,c)

流水线会直接将此 List 列扩展为三个不同的列(在 Cloud Bigtable 中称为列限定符)。列名称为“mylist”,但流水线还会附加上列表中各项的索引编号,例如“mylist[0]”。

row mylist[0] mylist[1] mylist[2]
1 a b c

流水线按照处理列表的方式来处理集合,但添加了额外的后缀来指明单元是键还是值。

row mymap
1 {"first_key":"first_value","another_key":"different_value"}

转换后,表将如下所示:

row mymap[0].key mymap[0].value mymap[1].key mymap[1].value
1 first_key first_value another_key different_value

主键转换

在 Apache Cassandra 中,主键是使用数据定义语言定义的。主键可以是简单的、复合的,或者是与聚簇列复合的。Cloud Bigtable 支持手动行键构造,按字典顺序对字节数组进行排序。此流水线会收集有关键类型的信息,并遵循基于多个值构建行键的最佳做法构建键。

模板参数

参数 说明
cassandraHosts 以英文逗号分隔的列表中的 Apache Cassandra 节点的主机。
cassandraPort (可选)节点上用于访问 Apache Cassandra 的 TCP 端口(默认为 9042)。
cassandraKeyspace 表格所在的 Apache Cassandra 键空间。
cassandraTable 要复制的 Apache Cassandra 表格。
bigtableProjectId 应该复制 Apache Cassandra 表所在 Cloud Bigtable 实例的 Google Cloud 项目 ID。
bigtableInstanceId 要复制 Apache Cassandra 表的 Cloud Bigtable 实例 ID。
bigtableTableId 要在其中复制 Apache Cassandra 表的 Cloud Bigtable 表的名称。
defaultColumnFamily (可选)Cloud Bigtable 表的列族的名称(默认为 default)。
rowKeySeparator (可选)用于构建行键的分隔符(默认为 #)。

运行 Apache Cassandra to Cloud Bigtable 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Apache Cassandra to Cloud Bigtable template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为 Cloud Bigtable 所在的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BIGTABLE_INSTANCE_ID 替换为 Cloud Bigtable 实例 ID。
  • YOUR_BIGTABLE_TABLE_ID 替换为您的 Cloud Bigtable 表格名称。
  • YOUR_CASSANDRA_HOSTS 替换为 Apache Cassandra 主机列表。如果提供了多个主机,请按照说明了解如何转义逗号。
  • YOUR_CASSANDRA_KEYSPACE 替换为表格所在的 Apache Cassandra 键空间。
  • YOUR_CASSANDRA_TABLE 替换为需要迁移的 Apache Cassandra 表格。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cassandra_To_Cloud_Bigtable \
    --parameters\
bigtableProjectId=YOUR_PROJECT_ID,\
bigtableInstanceId=YOUR_BIGTABLE_INSTANCE_ID,\
bigtableTableId=YOUR_BIGTABLE_TABLE_ID,\
cassandraHosts=YOUR_CASSANDRA_HOSTS,\
cassandraKeyspace=YOUR_CASSANDRA_KEYSPACE,\
cassandraTable=YOUR_CASSANDRA_TABLE

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cassandra_To_Cloud_Bigtable

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为 Cloud Bigtable 所在的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BIGTABLE_INSTANCE_ID 替换为 Cloud Bigtable 实例 ID。
  • YOUR_BIGTABLE_TABLE_ID 替换为您的 Cloud Bigtable 表格名称。
  • YOUR_CASSANDRA_HOSTS 替换为 Apache Cassandra 主机列表。如果提供了多个主机,请按照说明了解如何转义逗号。
  • YOUR_CASSANDRA_KEYSPACE 替换为表格所在的 Apache Cassandra 键空间。
  • YOUR_CASSANDRA_TABLE 替换为需要迁移的 Apache Cassandra 表格。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Cassandra_To_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "YOUR_PROJECT_ID",
       "bigtableInstanceId": "YOUR_BIGTABLE_INSTANCE_ID",
       "bigtableTableId": "YOUR_BIGTABLE_TABLE_ID",
       "cassandraHosts": "YOUR_CASSANDRA_HOSTS",
       "cassandraKeyspace": "YOUR_CASSANDRA_KEYSPACE",
       "cassandraTable": "YOUR_CASSANDRA_TABLE"
   },
   "environment": { "zone": "us-central1-f" }
}

Apache Hive to BigQuery

Apache Hive to BigQuery 模板是一种批处理流水线,可从 Apache Hive 表中读取该表,然后将其写入 BigQuery 表中。

对此流水线的要求

  • 在运行流水线之前,必须已存在目标 Bigtable 表。
  • Dataflow 工作器与 Apache Hive 节点之间必须存在网络连接。
  • Dataflow 工作器与 Apache Thrift 服务器节点之间必须存在网络连接。
  • BigQuery 数据集必须存在才能执行此流水线。

模板参数

参数 说明
metastoreUri Apache Thrift 服务器 URI,例如 thrift://thrift-server-host:port
hiveDatabaseName 包含要导出的表的 Apache Hive 数据库名称。
hiveTableName 您要导出的 Apache Hive 表名称。
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
hivePartitionCols (可选)Apache Hive 分区列的英文逗号分隔列表。
filterString (可选)输入 Apache Hive 表的过滤条件字符串。
partitionType (可选)BigQuery 中的分区类型。目前只支持 Time
partitionCol (可选)输出 BigQuery 表中的分区列名称。

运行 Apache Hive to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Apache Hive to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Hive_To_BigQuery

请替换以下内容:

  • PROJECT_ID:BigQuery 所在的项目 ID。
  • JOB_NAME:您选择的作业名称。
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
  • METASTORE_URI:Apache Thrift 服务器 URI
  • HIVE_DATABASE_NAME:包含您要导出的表的 Apache Hive 数据库名称。
  • HIVE_TABLE_NAME:您要导出的 Apache Hive 表名称。
  • HIVE_PARTITION_COLS:以英文逗号分隔的 Apache Hive 分区列列表。
  • FILTER_STRING:Apache Hive 输入表的过滤条件字符串。
  • PARTITION_TYPE:BigQuery 中的分区类型。
  • PARTITION_COL:BigQuery 分区列的名称。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Hive_To_BigQuery \
    --parameters\
metastoreUri=METASTORE_URI,\
hiveDatabaseName=HIVE_DATABASE_NAME,\
hiveTableName=HIVE_TABLE_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
hivePartitionCols=HIVE_PARTITION_COLS,\
filterString=FILTER_STRING,\
partitionType=PARTITION_TYPE,\
partitionCol=PARTITION_COL

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Hive_To_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:BigQuery 所在的项目 ID。
  • JOB_NAME:您选择的作业名称。
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
  • METASTORE_URI:Apache Thrift 服务器 URI
  • HIVE_DATABASE_NAME:包含您要导出的表的 Apache Hive 数据库名称。
  • HIVE_TABLE_NAME:您要导出的 Apache Hive 表名称。
  • HIVE_PARTITION_COLS:以英文逗号分隔的 Apache Hive 分区列列表。
  • FILTER_STRING:Apache Hive 输入表的过滤条件字符串。
  • PARTITION_TYPE:BigQuery 中的分区类型。
  • PARTITION_COL:BigQuery 分区列的名称。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/templates:launch?gcsPath=gs://dataflow-templates/latest/Hive_To_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "metastoreUri": "METASTORE_URI",
       "hiveDatabaseName": "HIVE_DATABASE_NAME",
       "hiveTableName": "HIVE_TABLE_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME",
       "hivePartitionCols": "HIVE_PARTITION_COLS",
       "filterString": "FILTER_STRING",
       "partitionType": "PARTITION_TYPE",
       "partitionCol": "PARTITION_COL"
   },
   "environment": { "zone": "us-central1-f" }
}

文件格式转换(Avro、Parquet、CSV)

文件格式转换模板是批处理流水线,用于将 Cloud Storage 中存储的文件从一种受支持的格式转换为另一种格式。

支持以下格式转换

  • CSV 到 Avro。
  • CSV 到 Parquet。
  • Avro 到 Parquet。
  • Parquet 到 Avro。

对此流水线的要求

  • 在运行流水线之前,Cloud Storage 输出存储分区必须已存在。

模板参数

参数 说明
inputFileFormat 输入文件格式。必须为 [csv, avro, parquet] 之一。
outputFileFormat 输出文件格式。必须为 [avro, parquet] 之一。
inputFileSpec 输入文件的 Cloud Storage 路径模式。例如 gs://bucket-name/path/*.csv
outputBucket 用于写入输出文件的 Cloud Storage 文件夹。此路径应以斜杠结尾。例如 gs://bucket-name/output/
schema Avro 架构文件的 Cloud Storage 路径。例如 gs://bucket-name/schema/my-schema.avsc
containsHeaders [可选] 输入 CSV 文件包含标题记录 (true/false)。默认值:false。仅在读取 CSV 文件时才需要。
csvFormat [可选] 用于解析记录的 CSV 格式规范。默认值:Default。如需了解详情,请参阅 Apache Commons CSV 格式
delimiter [可选] 输入 CSV 文件使用的字段分隔符。
outputFilePrefix [可选] 输出文件前缀。默认值:output
numShards [可选] 输出文件分片数。

运行文件格式转换模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the File Format Conversion template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行 Flex 模板,必须具有 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/File_Format_Conversion

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • INPUT_FORMAT 替换为输入文件的文件格式。必须为 [csv, avro, parquet] 之一。
  • OUTPUT_FORMAT 替换为输出文件的文件格式。必须为 [avro, parquet] 之一。
  • INPUT_FILES 替换为输入文件的路径模式。
  • OUTPUT_FOLDER 替换为输出文件的 Cloud Storage 文件夹。
  • SCHEMA 替换为 Avro 架构文件的路径。
  • LOCATION 替换为执行地区。例如 us-central1
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID} \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/File_Format_Conversion

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

您必须在此示例中替换以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • JOB_NAME 替换为您选择的作业名称。
  • INPUT_FORMAT 替换为输入文件的文件格式。必须为 [csv, avro, parquet] 之一。
  • OUTPUT_FORMAT 替换为输出文件的文件格式。必须为 [avro, parquet] 之一。
  • INPUT_FILES 替换为输入文件的路径模式。
  • OUTPUT_FOLDER 替换为输出文件的 Cloud Storage 文件夹。
  • SCHEMA 替换为 Avro 架构文件的路径。
  • LOCATION 替换为执行地区。例如 us-central1
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/File_Format_Conversion",
   }
}