Streaming perubahan Bigtable ke template BigQuery

Aliran perubahan Bigtable ke template BigQuery adalah pipeline streaming yang melakukan streaming data perubahan Bigtable dan menulisnya ke tabel BigQuery menggunakan Dataflow.

Aliran perubahan data Bigtable memungkinkan Anda berlangganan mutasi data berdasarkan per tabel. Saat Anda berlangganan aliran perubahan tabel, batasan berikut akan berlaku:

  • Hanya sel yang diubah dan deskripsi operasi penghapusan yang ditampilkan.
  • Hanya nilai baru dari sel yang diubah yang ditampilkan.

Saat data perubahan dicatat ke BigQuery, baris mungkin disisipkan secara tidak berurutan dibandingkan dengan urutan stempel waktu commit Bigtable asli.

Baris tabel log perubahan yang tidak dapat ditulis ke BigQuery karena error persisten ditempatkan secara permanen ke dalam direktori antrean pesan yang dihentikan pengirimannya (antrean pesan yang tidak diproses) di Cloud Storage untuk ditinjau oleh manusia atau diproses lebih lanjut oleh pengguna.

Jika tabel BigQuery yang diperlukan tidak ada, pipeline akan membuatnya. Jika tidak, tabel BigQuery yang ada akan digunakan. Skema tabel BigQuery yang ada harus berisi kolom dalam tabel berikut.

Setiap baris BigQuery baru menyertakan satu data perubahan data yang ditampilkan oleh aliran perubahan dari baris yang sesuai di tabel Bigtable Anda.

Skema tabel output BigQuery

Nama kolom Jenis Nullable Deskripsi
row_key STRING atau BYTES Tidak Kunci baris dari baris yang diubah. Jika opsi pipeline writeRowkeyAsBytes ditetapkan ke true, jenis kolom harus BYTES. Jika tidak, gunakan jenis STRING.
mod_type STRING Tidak Jenis mutasi baris. Gunakan salah satu nilai berikut: SET_CELL, DELETE_CELLS, atau DELETE_FAMILY.
column_family STRING Tidak Grup kolom yang terpengaruh oleh mutasi baris.
column STRING Ya Penentu kolom yang terpengaruh oleh mutasi baris. Untuk jenis mutasi DELETE_FAMILY, tetapkan ke NULL.
commit_timestamp TIMESTAMP Tidak Waktu saat Bigtable menerapkan mutasi.
big_query_commit_timestamp TIMESTAMP Ya Opsional: Menentukan waktu saat BigQuery menulis baris ke tabel output. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP atau INT64 Ya Nilai stempel waktu sel yang terpengaruh oleh mutasi. Jika opsi pipeline writeNumericTimestamps ditetapkan ke true, jenis kolom harus INT64. Jika tidak, gunakan jenis TIMESTAMP. Untuk jenis mutasi DELETE_CELLS dan DELETE_FAMILY, tetapkan ke NULL.
timestamp_from TIMESTAMP atau INT64 Ya Menjelaskan awal inklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS. Untuk jenis mutasi lainnya, tetapkan ke NULL.
timestamp_to TIMESTAMP atau INT64 Ya Menjelaskan akhir eksklusif interval stempel waktu untuk semua sel yang dihapus oleh mutasi DELETE_CELLS. Untuk jenis mutasi lainnya, tetapkan ke NULL.
is_gc BOOL Tidak Opsional: Jika mutasi dipicu oleh kebijakan pembersihan sampah, tetapkan ke true. Dalam kasus lain, tetapkan ke false. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
source_instance STRING Tidak Opsional: Menjelaskan nama instance Bigtable tempat mutasi berasal. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING Tidak Opsional: Menjelaskan nama cluster Bigtable tempat mutasi berasal. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
source_table STRING Tidak Opsional: Menjelaskan nama tabel Bigtable yang menjadi sasaran mutasi. Nilai dalam kolom ini mungkin berguna jika beberapa tabel Bigtable melakukan streaming perubahan ke tabel BigQuery yang sama. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 Tidak Opsional: Jika dua mutasi didaftarkan secara bersamaan oleh cluster Bigtable yang berbeda, mutasi dengan nilai tiebreaker tertinggi akan diterapkan ke tabel sumber. Mutasi dengan nilai tiebreaker yang lebih rendah akan dihapus. Kolom tidak diisi jika nama kolom ada dalam nilai opsi pipeline bigQueryChangelogTableFieldsToIgnore.
value STRING atau BYTES Ya Nilai baru yang ditetapkan oleh mutasi. Jika opsi pipeline writeValuesAsBytes ditetapkan ke true, jenis kolom harus BYTES. Jika tidak, gunakan jenis STRING. Nilai ditetapkan untuk mutasi SET_CELL. Untuk jenis mutasi lainnya, nilai ditetapkan ke NULL.

Persyaratan pipeline

  • Instance sumber Bigtable yang ditentukan.
  • Tabel sumber Bigtable yang ditentukan. Tabel harus mengaktifkan aliran perubahan.
  • Profil aplikasi Bigtable yang ditentukan.
  • Set data tujuan BigQuery yang ditentukan.

Parameter template

Parameter yang diperlukan

  • bigQueryDataset: Nama set data tabel BigQuery tujuan.
  • bigtableChangeStreamAppProfile: ID profil aplikasi Bigtable. Profil aplikasi harus menggunakan perutean cluster tunggal dan mengizinkan transaksi baris tunggal.
  • bigtableReadInstanceId: ID instance Bigtable sumber.
  • bigtableReadTableId: ID tabel Bigtable sumber.

Parameter opsional

  • writeRowkeyAsBytes: Apakah akan menulis kunci baris sebagai BYTES BigQuery. Jika ditetapkan ke true, kunci baris akan ditulis ke kolom BYTES. Jika tidak, kunci baris akan ditulis ke kolom STRING. Nilai default-nya adalah false.
  • writeValuesAsBytes: Jika ditetapkan ke true, nilai akan ditulis ke kolom berjenis BYTES, jika tidak, ke kolom berjenis STRING . Default-nya adalah: false.
  • writeNumericTimestamps: Apakah akan menulis stempel waktu Bigtable sebagai BigQuery INT64. Jika ditetapkan ke true, nilai akan ditulis ke kolom INT64. Jika tidak, nilai akan ditulis ke kolom TIMESTAMP. Kolom yang terpengaruh: timestamp, timestamp_from, dan timestamp_to. Default-nya adalah false. Jika ditetapkan ke true, waktu diukur dalam mikrodetik sejak epoch Unix (1 Januari 1970 pada UTC).
  • bigQueryProjectId: ID project set data BigQuery. Defaultnya adalah project untuk tugas Dataflow.
  • bigQueryChangelogTableName: Nama tabel BigQuery tujuan. Jika tidak ditentukan, nilai bigtableReadTableId + "_changelog" akan digunakan. Default-nya adalah kosong.
  • bigQueryChangelogTablePartitionGranularity: Menentukan tingkat perincian untuk mempartisi tabel log perubahan. Jika ditetapkan, tabel akan dipartisi. Gunakan salah satu nilai yang didukung berikut: HOUR, DAY, MONTH, atau YEAR. Secara default, tabel tidak dipartisi.
  • bigQueryChangelogTablePartitionExpirationMs: Menetapkan waktu habis masa berlaku partisi tabel log perubahan, dalam milidetik. Jika ditetapkan ke true, partisi yang lebih lama dari jumlah milidetik yang ditentukan akan dihapus. Secara default, tidak ada masa berlaku yang ditetapkan.
  • bigQueryChangelogTableFieldsToIgnore: Daftar kolom log perubahan yang dipisahkan koma yang, jika ditentukan, tidak dibuat dan diisi. Gunakan salah satu nilai yang didukung berikut: is_gc, source_instance, source_cluster, source_table, tiebreaker, atau big_query_commit_timestamp. Secara default, semua kolom diisi.
  • dlqDirectory: Direktori yang akan digunakan untuk antrean surat tidak terkirim. Data yang gagal diproses disimpan di direktori ini. Secara default, direktori ini berada di lokasi sementara tugas Dataflow. Dalam sebagian besar kasus, Anda dapat menggunakan jalur default.
  • bigtableChangeStreamMetadataInstanceId: ID instance metadata aliran perubahan data Bigtable. Default-nya adalah kosong.
  • bigtableChangeStreamMetadataTableTableId: ID tabel metadata konektor aliran perubahan data Bigtable. Jika tidak diberikan, tabel metadata konektor aliran perubahan Bigtable akan otomatis dibuat selama eksekusi pipeline. Default-nya adalah kosong.
  • bigtableChangeStreamCharset: Nama set karakter aliran perubahan data Bigtable. Setelan defaultnya adalah: UTF-8.
  • bigtableChangeStreamStartTimestamp: Stempel waktu awal (https://tools.ietf.org/html/rfc3339), inklusif, untuk digunakan membaca aliran perubahan. Misalnya, 2022-05-05T07:59:59Z. Default-nya adalah stempel waktu waktu mulai pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: Daftar perubahan nama grup kolom yang dipisahkan koma yang akan diabaikan. Default-nya adalah kosong.
  • bigtableChangeStreamIgnoreColumns: Daftar perubahan nama kolom yang dipisahkan koma yang akan diabaikan. Default-nya adalah kosong.
  • bigtableChangeStreamName: Nama unik untuk pipeline klien. Memungkinkan Anda melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dihentikan. Secara default, nama yang dibuat secara otomatis. Lihat log tugas Dataflow untuk mengetahui nilai yang digunakan.
  • bigtableChangeStreamResume: Jika ditetapkan ke true, pipeline baru akan melanjutkan pemrosesan dari titik saat pipeline yang sebelumnya berjalan dengan nilai bigtableChangeStreamName yang sama dihentikan. Jika pipeline dengan nilai bigtableChangeStreamName yang diberikan belum pernah berjalan, pipeline baru tidak akan dimulai. Jika ditetapkan ke false, pipeline baru akan dimulai. Jika pipeline dengan nilai bigtableChangeStreamName yang sama telah berjalan untuk sumber tertentu, pipeline baru tidak akan dimulai. Default-nya adalah false.
  • bigtableReadProjectId: Project ID Bigtable. Defaultnya adalah project untuk tugas Dataflow.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Bigtable change streams to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_INSTANCE_ID: ID instance Bigtable Anda.
  • BIGTABLE_TABLE_ID: ID tabel Bigtable Anda.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID profil aplikasi Bigtable Anda.
  • BIGQUERY_DESTINATION_DATASET: nama set data tujuan BigQuery

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_INSTANCE_ID: ID instance Bigtable Anda.
  • BIGTABLE_TABLE_ID: ID tabel Bigtable Anda.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID profil aplikasi Bigtable Anda.
  • BIGQUERY_DESTINATION_DATASET: nama set data tujuan BigQuery
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigQueryDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.ExistingPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Bigtable_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to BigQuery",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into BigQuery using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamToBigQueryOptions.class,
    optionsOrder = {
      BigtableChangeStreamToBigQueryOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-bigquery",
    flexContainerName = "bigtable-changestreams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToBigQuery {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToBigQuery.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to BigQuery");

    BigtableChangeStreamToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamToBigQueryOptions.class);

    run(options);
  }

  private static void setOptions(BigtableChangeStreamToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options) {
    setOptions(options);

    String changelogTableName = getBigQueryChangelogTableName(options);
    String bigtableProject = getBigtableProjectId(options);
    String bigQueryProject = getBigQueryProjectId(options);
    String bigQueryDataset = options.getBigQueryDataset();

    // If dataset doesn't exist and not checked, pipeline will start failing only after it sees the
    // first change from Cloud Bigtable. BigQueryIO can create table if it doesn't exist, but it
    // cannot create a dataset
    validateBigQueryDatasetExists(bigQueryProject, bigQueryDataset);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    BigQueryDestination destinationInfo =
        new BigQueryDestination(
            bigQueryProject,
            bigQueryDataset,
            changelogTableName,
            options.getWriteRowkeyAsBytes(),
            options.getWriteValuesAsBytes(),
            options.getWriteNumericTimestamps(),
            options.getBigQueryChangelogTablePartitionGranularity(),
            options.getBigQueryChangelogTablePartitionExpirationMs(),
            options.getBigQueryChangelogTableFieldsToIgnore());

    BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);

    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? ExistingPipelineOptions.RESUME_OR_FAIL
                    : ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }

    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<TableRow> changeStreamMutationToTableRow =
        dataChangeRecord.apply(
            "ChangeStreamMutation To TableRow",
            ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));

    Write<TableRow> bigQueryWrite =
        BigQueryIO.<TableRow>write()
            .to(destinationInfo.getBigQueryTableReference())
            .withSchema(bigQuery.getDestinationTableSchema())
            .withFormatFunction(element -> element)
            .withFormatRecordOnFailureFunction(element -> element)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withExtendedErrorInfo()
            .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
            .withNumStorageWriteApiStreams(0)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());

    if (destinationInfo.isPartitioned()) {
      bigQueryWrite = bigQueryWrite.withTimePartitioning(bigQuery.getTimePartitioning());
    }

    // Unfortunately, due to https://github.com/apache/beam/issues/24090, it is no longer possible
    // to pass metadata via fake columns when writing to BigQuery. Previously we'd pass something
    // like retry count and then format it out before writing, but BQ would return original object
    // which would allow us to increment retry count and store it to DLQ with incremented number.
    // Because WRITE API doesn't allow access to original object, all metadata values are stripped
    // and we can only rely on retry policy and put all other persistently failing rows to DLQ as
    // a non-retriable severe failure.
    //
    // Since we're not going to be retrying such failures, we'll not use any reading from DLQ
    // capability.

    WriteResult writeResult =
        changeStreamMutationToTableRow.apply("Write To BigQuery", bigQueryWrite);

    writeResult
        .getFailedStorageApiInserts()
        .apply(
            "Failed Mod JSON During BigQuery Writes",
            MapElements.via(new BigQueryDeadLetterQueueSanitizer()))
        .apply(
            "Write rejected TableRow JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static void validateBigQueryDatasetExists(
      String bigQueryProject, String bigQueryDataset) {
    BigQueryOptions options = BigQueryOptions.newBuilder().build();
    options.setThrowNotFound(true);

    BigQuery bigQuery = options.getService();
    bigQuery.getDataset(DatasetId.of(bigQueryProject, bigQueryDataset));
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }

  private static String getBigtableCharset(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getBigQueryChangelogTableName(
      BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryChangelogTableName())
        ? options.getBigtableReadTableId() + "_changelog"
        : options.getBigQueryChangelogTableName();
  }

  private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryProjectId())
        ? options.getProject()
        : options.getBigQueryProjectId();
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {
    private final BigtableSource sourceInfo;
    private final BigQueryUtils bigQuery;

    ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
      this.sourceInfo = source;
      this.bigQuery = bigQuery;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a dead letter queue directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        TableRow tableRow = new TableRow();
        if (bigQuery.setTableRowFields(mod, tableRow)) {
          receiver.output(tableRow);
        }
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

Langkah berikutnya