Bigtable change streams to BigQuery テンプレート

Bigtable change streams to BigQuery テンプレートは、Dataflow を使用して Bigtable データ変更レコードをストリーミングし、BigQuery テーブルに書き込むストリーミング パイプラインです。

Bigtable 変更ストリームを使用すると、テーブルごとにデータ ミューテーションをサブスクライブできます。テーブル変更ストリームをサブスクライブすると、次の制約が適用されます。

  • 変更されたセルと削除オペレーションの記述子のみが返されます。
  • 変更されたセルの新しい値のみが返されます。

データ変更レコードが BigQuery に書き込まれたときに、元の Bigtable commit タイムスタンプの順序と比べると行が順不同で挿入される可能性があります。

永続的なエラーのために BigQuery に書き込めない変更履歴テーブルの行は、人間による確認またはユーザーによる追加の処理のため、Cloud Storage のデッドレター キュー(未処理メッセージ キュー)ディレクトリに永続的に配置されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、次の表の列が含まれている必要があります。

新しい BigQuery の各行には、変更ストリームによって Bigtable テーブルの対応行から返された 1 つのデータ変更レコードが含まれます。

BigQuery 出力テーブルのスキーマ

列名 タイプ null 可能性 説明
row_key STRING または BYTES いいえ 変更された行の行キー。writeRowkeyAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。
mod_type STRING いいえ 行ミューテーションのタイプ。SET_CELLDELETE_CELLSDELETE_FAMILY のいずれかの値を使用します。
column_family STRING いいえ 行ミューテーションの影響を受ける列ファミリー。
column STRING はい 行ミューテーションの影響を受ける列修飾子。DELETE_FAMILY ミューテーション タイプの場合は、NULL に設定します。
commit_timestamp TIMESTAMP いいえ Bigtable がミューテーションを適用する時間。
big_query_commit_timestamp TIMESTAMP はい 省略可: BigQuery が行を出力テーブルに書き込む時間を指定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
timestamp TIMESTAMP または INT64 はい ミューテーションの影響を受けるセルのタイムスタンプ値。writeNumericTimestamps パイプライン オプションが true に設定されている場合、列のタイプは INT64 にする必要があります。それ以外の場合は、TIMESTAMP タイプを使用します。ミューテーション タイプが DELETE_CELLS および DELETE_FAMILY の場合は、NULL に設定します。
timestamp_from TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の包括的な開始点を示します。他のミューテーション タイプの場合は、NULL に設定します。
timestamp_to TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の排他的終了点を示します。他のミューテーション タイプの場合は、NULL に設定します。
is_gc BOOL いいえ 省略可: ミューテーションがガベージ コレクション ポリシーによってトリガーされた場合は、true に設定します。それ以外の場合は false に設定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_instance STRING いいえ (省略可)ミューテーションの取得元である Bigtable インスタンスの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_cluster STRING いいえ (省略可)ミューテーションの取得元である Bigtable クラスタの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_table STRING いいえ 省略可: ミューテーションが適用される Bigtable テーブルの名前を記述します。複数の Bigtable テーブルが同じ BigQuery テーブルに変更をストリーミングする場合、この列の値が役に立つことがあります。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
tiebreaker INT64 いいえ (省略可)異なる Bigtable クラスタによって 2 つのミューテーションが同時に登録された場合、tiebreaker 値が最も高いミューテーションがソーステーブルに適用されます。tiebreaker 値が小さいミューテーションは破棄されます。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
value STRING または BYTES はい ミューテーションによって設定された新しい値。writeValuesAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。SET_CELL ミューテーションには値が設定されます。他のミューテーション タイプの場合、値は NULL に設定されます。

パイプラインの要件

  • 指定された Bigtable ソース インスタンス。
  • 指定された Bigtable ソーステーブル。テーブルで変更ストリームが有効になっている必要があります。
  • 指定された Bigtable アプリケーション プロファイル。
  • 指定された BigQuery の宛先データセット。

テンプレートのパラメータ

必須パラメータ

  • bigQueryDataset: 宛先 BigQuery テーブルのデータセット名
  • bigtableChangeStreamAppProfile: Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
  • bigtableReadInstanceId: ソース Bigtable インスタンス ID。
  • bigtableReadTableId: ソース Bigtable テーブル ID。

オプション パラメータ

  • writeRowkeyAsBytes: 行キーを BigQuery BYTES として書き込むかどうか。true に設定すると、行キーは BYTES 列に書き込まれます。それ以外の場合、行キーは STRING 列に書き込まれます。デフォルトは false です。
  • writeValuesAsBytes: true に設定すると、値が BYTES 列に書き込まれます。それ以外の場合は、STRING 列に書き込まれます。デフォルトは false です。
  • writeNumericTimestamps: Bigtable のタイムスタンプを BigQuery の INT64 として書き込むかどうか。true に設定すると、値が INT64 列に書き込まれます。それ以外の場合、値は TIMESTAMP 列に書き込まれます。影響を受ける列: timestamptimestamp_fromtimestamp_to。デフォルトは false です。true に設定すると、時間は Unix エポック(1970 年 1 月 1 日 UTC)を起点とするマイクロ秒単位で測定されます。
  • bigQueryProjectId: BigQuery データセット プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableName: 宛先 BigQuery テーブルの名前。指定しない場合、値 bigtableReadTableId + "_changelog" が使用されます。デフォルトは空です。
  • bigQueryChangelogTablePartitionGranularity: 変更履歴テーブルのパーティショニングの粒度を指定します。設定すると、テーブルはパーティショニングされます。サポートされている値(HOURDAYMONTHYEAR)のいずれかを使用します。デフォルトでは、テーブルはパーティショニングされません。
  • bigQueryChangelogTablePartitionExpirationMs: 変更履歴テーブルのパーティションの有効期限をミリ秒単位で設定します。true に設定すると、指定したミリ秒数より古いパーティションが削除されます。デフォルトでは、有効期限は設定されていません。
  • bigQueryChangelogTableFieldsToIgnore: 指定した場合、作成されず入力されない変更履歴列のカンマ区切りのリスト。サポートされている値(is_gcsource_instancesource_clustersource_tabletiebreakerbig_query_commit_timestamp)のいずれかを使用します。デフォルトでは、すべての列にデータが入力されます。
  • dlqDirectory: デッドレター キューに使用するディレクトリ。処理に失敗したレコードは、このディレクトリに保存されます。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合、デフォルトのパスを使用できます。
  • bigtableChangeStreamMetadataInstanceId: Bigtable 変更ストリーム メタデータのインスタンス ID。デフォルトは空です。
  • bigtableChangeStreamMetadataTableTableId: Bigtable 変更ストリーム コネクタのメタデータ テーブル ID。指定しない場合、パイプライン実行中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
  • bigtableChangeStreamCharset: Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 に設定されます。
  • bigtableChangeStreamStartTimestamp: 変更ストリームの読み取りに使用される開始タイムスタンプ(https://tools.ietf.org/html/rfc3339)。たとえば、2022-05-05T07:59:59Z のようにします。デフォルトは、パイプラインの開始時間のタイムスタンプです。
  • bigtableChangeStreamIgnoreColumnFamilies: 無視する列ファミリー名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamIgnoreColumns: 無視する列名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamName: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
  • bigtableChangeStreamResume: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。特定のソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始されません。デフォルトは false です。
  • bigtableReadProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigQueryDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.ExistingPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Bigtable_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to BigQuery",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into BigQuery using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamToBigQueryOptions.class,
    optionsOrder = {
      BigtableChangeStreamToBigQueryOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-bigquery",
    flexContainerName = "bigtable-changestreams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToBigQuery {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToBigQuery.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to BigQuery");

    BigtableChangeStreamToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamToBigQueryOptions.class);

    run(options);
  }

  private static void setOptions(BigtableChangeStreamToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options) {
    setOptions(options);

    String changelogTableName = getBigQueryChangelogTableName(options);
    String bigtableProject = getBigtableProjectId(options);
    String bigQueryProject = getBigQueryProjectId(options);
    String bigQueryDataset = options.getBigQueryDataset();

    // If dataset doesn't exist and not checked, pipeline will start failing only after it sees the
    // first change from Cloud Bigtable. BigQueryIO can create table if it doesn't exist, but it
    // cannot create a dataset
    validateBigQueryDatasetExists(bigQueryProject, bigQueryDataset);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    BigQueryDestination destinationInfo =
        new BigQueryDestination(
            bigQueryProject,
            bigQueryDataset,
            changelogTableName,
            options.getWriteRowkeyAsBytes(),
            options.getWriteValuesAsBytes(),
            options.getWriteNumericTimestamps(),
            options.getBigQueryChangelogTablePartitionGranularity(),
            options.getBigQueryChangelogTablePartitionExpirationMs(),
            options.getBigQueryChangelogTableFieldsToIgnore());

    BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);

    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? ExistingPipelineOptions.RESUME_OR_FAIL
                    : ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }

    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<TableRow> changeStreamMutationToTableRow =
        dataChangeRecord.apply(
            "ChangeStreamMutation To TableRow",
            ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));

    Write<TableRow> bigQueryWrite =
        BigQueryIO.<TableRow>write()
            .to(destinationInfo.getBigQueryTableReference())
            .withSchema(bigQuery.getDestinationTableSchema())
            .withFormatFunction(element -> element)
            .withFormatRecordOnFailureFunction(element -> element)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withExtendedErrorInfo()
            .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
            .withNumStorageWriteApiStreams(0)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());

    if (destinationInfo.isPartitioned()) {
      bigQueryWrite = bigQueryWrite.withTimePartitioning(bigQuery.getTimePartitioning());
    }

    // Unfortunately, due to https://github.com/apache/beam/issues/24090, it is no longer possible
    // to pass metadata via fake columns when writing to BigQuery. Previously we'd pass something
    // like retry count and then format it out before writing, but BQ would return original object
    // which would allow us to increment retry count and store it to DLQ with incremented number.
    // Because WRITE API doesn't allow access to original object, all metadata values are stripped
    // and we can only rely on retry policy and put all other persistently failing rows to DLQ as
    // a non-retriable severe failure.
    //
    // Since we're not going to be retrying such failures, we'll not use any reading from DLQ
    // capability.

    WriteResult writeResult =
        changeStreamMutationToTableRow.apply("Write To BigQuery", bigQueryWrite);

    writeResult
        .getFailedStorageApiInserts()
        .apply(
            "Failed Mod JSON During BigQuery Writes",
            MapElements.via(new BigQueryDeadLetterQueueSanitizer()))
        .apply(
            "Write rejected TableRow JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static void validateBigQueryDatasetExists(
      String bigQueryProject, String bigQueryDataset) {
    BigQueryOptions options = BigQueryOptions.newBuilder().build();
    options.setThrowNotFound(true);

    BigQuery bigQuery = options.getService();
    bigQuery.getDataset(DatasetId.of(bigQueryProject, bigQueryDataset));
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }

  private static String getBigtableCharset(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getBigQueryChangelogTableName(
      BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryChangelogTableName())
        ? options.getBigtableReadTableId() + "_changelog"
        : options.getBigQueryChangelogTableName();
  }

  private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryProjectId())
        ? options.getProject()
        : options.getBigQueryProjectId();
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {
    private final BigtableSource sourceInfo;
    private final BigQueryUtils bigQuery;

    ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
      this.sourceInfo = source;
      this.bigQuery = bigQuery;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a dead letter queue directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        TableRow tableRow = new TableRow();
        if (bigQuery.setTableRowFields(mod, tableRow)) {
          receiver.output(tableRow);
        }
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

次のステップ