BigQuery to Bigtable 模板

BigQuery to Bigtable 模板是一种批处理流水线,可将数据从 BigQuery 表复制到现有的 Bigtable 表中。 该模板可以读取整个表,也可以使用提供的查询读取特定记录。

流水线要求

  • 源 BigQuery 表必须存在。
  • Bigtable 表必须已存在。
  • Worker 服务账号需要具有 roles/bigquery.datasets.create 权限。如需了解详情,请参阅预留简介

模板参数

必需参数

  • readIdColumn:存储行的唯一标识符的 BigQuery 列的名称。
  • bigtableWriteInstanceId:表所属的 Bigtable 实例的 ID。
  • bigtableWriteTableId:要写入的 Bigtable 表的 ID。
  • bigtableWriteColumnFamily:要将数据写入的 Bigtable 表的列族名称。

可选参数

  • inputTableSpec:要读取的 BigQuery 表。如果指定 inputTableSpec,模板将使用 BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage) 直接从 BigQuery 存储读取数据。如需了解 Storage Read API 中的限制,请参阅 https://cloud.google.com/bigquery/docs/reference/storage#limitations。您必须指定 inputTableSpecquery。如果同时设置了这两个参数,则模板会使用 query 参数。例如 <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
  • outputDeadletterTable:未能到达输出表的消息的 BigQuery 表。如果表不存在,系统会在流水线执行期间创建该表。如果未指定,则系统会使用 <outputTableSpec>_error_records。例如 <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
  • query:用于从 BigQuery 读取数据的 SQL 查询。如果 BigQuery 数据集与 Dataflow 作业位于不同的项目中,请在 SQL 查询中指定完整的数据集名称,例如:<PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>。默认情况下,除非 useLegacySqltrue,否则 query 参数会使用 GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql)。您必须指定 inputTableSpecquery。如果同时设置了这两个参数,则模板会使用 query 参数。例如 select * from sampledb.sample_table
  • useLegacySql:设置为 true 即可使用旧版 SQL。此参数仅在使用 query 参数时适用。默认值为 false
  • queryLocation:在没有底层表权限的情况下从授权视图读取数据时需要使用。例如 US
  • queryTempDataset:借助此选项,您可以设置现有数据集以创建临时表来存储查询结果。例如 temp_dataset
  • KMSEncryptionKey:如果使用查询来源从 BigQuery 读取数据,请使用此 Cloud KMS 密钥加密创建的所有临时表。例如 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • bigtableRpcAttemptTimeoutMs:每次 Bigtable RPC 尝试的超时(以毫秒为单位)。
  • bigtableRpcTimeoutMs:Bigtable RPC 操作的总超时(以毫秒为单位)。
  • bigtableAdditionalRetryCodes:其他重试代码。例如 RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED
  • bigtableWriteAppProfile:用于导出的 Bigtable 应用配置文件的 ID。如果您未指定应用配置文件,Bigtable 将使用实例的默认应用配置文件 (https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile)。
  • bigtableWriteProjectId:包含要将数据写入到的 Bigtable 实例的 Google Cloud 项目的 ID。
  • bigtableBulkWriteLatencyTargetMs:Bigtable 的延迟时间目标(以毫秒为单位),用于基于延迟时间进行节流。
  • bigtableBulkWriteMaxRowKeyCount:Bigtable 批量写入操作中的行键数上限。
  • bigtableBulkWriteMaxRequestSizeBytes:每项 Bigtable 批量写入操作包含的字节数上限。

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the BigQuery to Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Bigtable \
    --parameters \
readIdColumn=READ_COLUMN_ID,\
inputTableSpec=INPUT_TABLE_SPEC,\
bigtableWriteInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableWriteTableId=BIGTABLE_TABLE_ID,\
bigtableWriteColumnFamily=BIGTABLE_COLUMN_FAMILY

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • READ_COLUMN_ID:您的 BigQuery 唯一 ID 列。
  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_COLUMN_FAMILY:您的 Bigtable 表列族。

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readIdColumn": "READ_COLUMN_ID",
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "bigtableWriteInstanceId": "BIGTABLE_INSTANCE_ID",
          "bigtableWriteTableId": "BIGTABLE_TABLE_ID",
          "bigtableWriteColumnFamily": "BIGTABLE_COLUMN_FAMILY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Bigtable",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • READ_COLUMN_ID:您的 BigQuery 唯一 ID 列。
  • INPUT_TABLE_SPEC:您的 BigQuery 表名称。
  • BIGTABLE_INSTANCE_ID:您的 Bigtable 实例 ID。
  • BIGTABLE_TABLE_ID:您的 Bigtable 表 ID。
  • BIGTABLE_COLUMN_FAMILY:您的 Bigtable 表列族。
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.bigtable.utils.BigtableConfig.generateCloudBigtableWriteConfiguration;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.transforms.BigtableConverters;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToBigtable.BigQueryToBigtableOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.hadoop.hbase.client.Mutation;

/**
 * Dataflow template which reads BigQuery data and writes it to Bigtable. The source data can be
 * either a BigQuery table or an SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to Bigtable",
    description = "A pipeline to export a BigQuery table into Bigtable.",
    optionsClass = BigQueryToBigtableOptions.class,
    optionsOrder = {
      BigQueryToBigtableOptions.class,
      BigQueryConverters.BigQueryReadOptions.class,
      BigtableCommonOptions.class,
      BigtableCommonOptions.WriteOptions.class
    },
    optionalOptions = {"inputTableSpec"},
    flexContainerName = "bigquery-to-bigtable",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source BigQuery table must exist.",
      "The Bigtable table must exist.",
      "The <a href=\"https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker-service-account\">worker service account</a>"
          + " needs the <code>roles/bigquery.datasets.create</code> permission. For"
          + " more information, see <a href=\"https://cloud.google.com/bigquery/docs/access-control\">Introduction to IAM</a>."
    })
public class BigQueryToBigtable {

  /**
   * The {@link BigQueryToBigtableOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToBigtableOptions
      extends BigQueryConverters.BigQueryReadOptions,
          BigtableCommonOptions.WriteOptions,
          GcpOptions {

    @TemplateParameter.Text(
        order = 1,
        regexes = {"[A-Za-z_][A-Za-z_0-9]*"},
        description = "Unique identifier column",
        helpText = "The name of the BigQuery column storing the unique identifier of the row.")
    @Required
    String getReadIdColumn();

    void setReadIdColumn(String value);
  }

  /**
   * Runs a pipeline which reads data from BigQuery and writes it to Bigtable.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToBigtableOptions.class);

    CloudBigtableTableConfiguration bigtableTableConfig =
        generateCloudBigtableWriteConfiguration(options);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            "AvroToMutation",
            BigQueryConverters.ReadBigQuery.<Mutation>newBuilder()
                .setOptions(options.as(BigQueryToBigtableOptions.class))
                .setReadFunction(
                    BigQueryIO.read(
                        BigtableConverters.AvroToMutation.newBuilder()
                            .setColumnFamily(options.getBigtableWriteColumnFamily())
                            .setRowkey(options.getReadIdColumn())
                            .build()))
                .build())
        .apply("WriteToTable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    pipeline.run();
  }
}

后续步骤