BigQuery to Bigtable テンプレート

BigQuery to Bigtable テンプレートは、BigQuery テーブルから既存の Bigtable テーブルにデータをコピーするバッチ パイプラインです。テンプレートでは、テーブル全体を読み取ることも、クエリを使用して特定のレコードを読み取ることもできます。

パイプラインの要件

  • ソースとなる BigQuery テーブルが存在していること。
  • Bigtable テーブルが存在していること。
  • ワーカー サービス アカウントには roles/bigquery.datasets.create 権限が必要です。詳細については、IAM の概要をご覧ください。

テンプレートのパラメータ

必須パラメータ

  • readIdColumn: 行の固有識別子を格納している BigQuery の列の名前。
  • bigtableWriteInstanceId: テーブルが含まれている Bigtable インスタンスの ID。
  • bigtableWriteTableId: 書き込み先の Bigtable テーブルの ID。
  • bigtableWriteColumnFamily: データを書き込む Bigtable テーブルの列ファミリーの名前。

オプション パラメータ

  • inputTableSpec: 読み取り元の BigQuery テーブル。inputTableSpec を指定すると、テンプレートは BigQuery Storage Read API(https://cloud.google.com/bigquery/docs/reference/storage)を使用して、BigQuery ストレージから直接データを読み取ります。Storage Read API の制限については、https://cloud.google.com/bigquery/docs/reference/storage#limitations をご覧ください。inputTableSpec または query を指定する必要があります。両方のパラメータを設定した場合、テンプレートは query パラメータを使用します。例: <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
  • outputDeadletterTable: 出力テーブルに到達できなかったメッセージの BigQuery テーブル。テーブルが存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は <outputTableSpec>_error_records が使用されます例: <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
  • query: BigQuery からデータを読み取るために使用する SQL クエリ。BigQuery データセットが Dataflow ジョブとは異なるプロジェクトにある場合は、SQL クエリで完全なデータセット名を指定します(例: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>)。デフォルトでは、useLegacySqltrue でない限り、query パラメータは GoogleSQL(https://cloud.google.com/bigquery/docs/introduction-sql)を使用します。inputTableSpec または query を指定する必要があります。両方のパラメータを設定した場合、テンプレートは query パラメータを使用します例: select * from sampledb.sample_table
  • useLegacySql: レガシー SQL を使用するには true に設定します。このパラメータは、query パラメータを使用する場合にのみ適用されます。デフォルトは false です。
  • queryLocation: 基となるテーブルの権限なしで承認済みビューから読み取る場合は必須です。例: US
  • queryTempDataset: このオプションを使用すると、既存のデータセットを設定して、クエリの結果を保存する一時テーブルを作成できます。例: temp_dataset
  • KMSEncryptionKey: クエリソースを使用して BigQuery から読み取る場合は、この Cloud KMS 鍵を使用して、作成された一時テーブルを暗号化します。例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • bigtableRpcAttemptTimeoutMs: Bigtable RPC の個々の試行のタイムアウト(ミリ秒)。
  • bigtableRpcTimeoutMs: Bigtable RPC オペレーションの合計タイムアウト(ミリ秒)。
  • bigtableAdditionalRetryCodes: 追加の再試行コード。例: RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED
  • bigtableWriteAppProfile: エクスポートに使用する Bigtable アプリケーション プロファイルの ID。アプリ プロファイルを指定しない場合、Bigtable はインスタンスのデフォルトのアプリ プロファイル(https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile)を使用します。
  • bigtableWriteProjectId: データを書き込む Bigtable インスタンスを含む Google Cloud プロジェクトの ID。
  • bigtableBulkWriteLatencyTargetMs: レイテンシ ベースのスロットリングに対する Bigtable のレイテンシ ターゲット(ミリ秒)。
  • bigtableBulkWriteMaxRowKeyCount: Bigtable バッチ書き込みオペレーションの行キーの最大数。
  • bigtableBulkWriteMaxRequestSizeBytes: 1 回の Bigtable バッチ書き込みオペレーションに含める最大バイト数。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the BigQuery to Bigtable template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Bigtable \
    --parameters \
readIdColumn=READ_COLUMN_ID,\
inputTableSpec=INPUT_TABLE_SPEC,\
bigtableWriteInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableWriteTableId=BIGTABLE_TABLE_ID,\
bigtableWriteColumnFamily=BIGTABLE_COLUMN_FAMILY

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • READ_COLUMN_ID: BigQuery の一意の ID の列。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_COLUMN_FAMILY: Bigtable テーブルの列ファミリー。

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readIdColumn": "READ_COLUMN_ID",
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "bigtableWriteInstanceId": "BIGTABLE_INSTANCE_ID",
          "bigtableWriteTableId": "BIGTABLE_TABLE_ID",
          "bigtableWriteColumnFamily": "BIGTABLE_COLUMN_FAMILY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Bigtable",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • READ_COLUMN_ID: BigQuery の一意の ID の列。
  • INPUT_TABLE_SPEC: BigQuery テーブル名。
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_COLUMN_FAMILY: Bigtable テーブルの列ファミリー。
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.bigtable.utils.BigtableConfig.generateCloudBigtableWriteConfiguration;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.transforms.BigtableConverters;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToBigtable.BigQueryToBigtableOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.hadoop.hbase.client.Mutation;

/**
 * Dataflow template which reads BigQuery data and writes it to Bigtable. The source data can be
 * either a BigQuery table or an SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to Bigtable",
    description = "A pipeline to export a BigQuery table into Bigtable.",
    optionsClass = BigQueryToBigtableOptions.class,
    optionsOrder = {
      BigQueryToBigtableOptions.class,
      BigQueryConverters.BigQueryReadOptions.class,
      BigtableCommonOptions.class,
      BigtableCommonOptions.WriteOptions.class
    },
    optionalOptions = {"inputTableSpec"},
    flexContainerName = "bigquery-to-bigtable",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source BigQuery table must exist.",
      "The Bigtable table must exist.",
      "The <a href=\"https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker-service-account\">worker service account</a>"
          + " needs the <code>roles/bigquery.datasets.create</code> permission. For"
          + " more information, see <a href=\"https://cloud.google.com/bigquery/docs/access-control\">Introduction to IAM</a>."
    })
public class BigQueryToBigtable {

  /**
   * The {@link BigQueryToBigtableOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToBigtableOptions
      extends BigQueryConverters.BigQueryReadOptions,
          BigtableCommonOptions.WriteOptions,
          GcpOptions {

    @TemplateParameter.Text(
        order = 1,
        regexes = {"[A-Za-z_][A-Za-z_0-9]*"},
        description = "Unique identifier column",
        helpText = "The name of the BigQuery column storing the unique identifier of the row.")
    @Required
    String getReadIdColumn();

    void setReadIdColumn(String value);
  }

  /**
   * Runs a pipeline which reads data from BigQuery and writes it to Bigtable.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToBigtableOptions.class);

    CloudBigtableTableConfiguration bigtableTableConfig =
        generateCloudBigtableWriteConfiguration(options);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            "AvroToMutation",
            BigQueryConverters.ReadBigQuery.<Mutation>newBuilder()
                .setOptions(options.as(BigQueryToBigtableOptions.class))
                .setReadFunction(
                    BigQueryIO.read(
                        BigtableConverters.AvroToMutation.newBuilder()
                            .setColumnFamily(options.getBigtableWriteColumnFamily())
                            .setRowkey(options.getReadIdColumn())
                            .build()))
                .build())
        .apply("WriteToTable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    pipeline.run();
  }
}

次のステップ