Template BigQuery ke Bigtable

Template BigQuery to Bigtable adalah pipeline batch yang menyalin data dari tabel BigQuery ke tabel Bigtable yang ada. Template dapat membaca seluruh tabel atau membaca data tertentu menggunakan kueri yang disediakan.

Persyaratan pipeline

  • Tabel BigQuery sumber harus ada.
  • Tabel Bigtable harus ada.
  • Akun layanan pekerja memerlukan izin roles/bigquery.datasets.create. Untuk mengetahui informasi selengkapnya, lihat Pengantar IAM.

Parameter template

Parameter yang diperlukan

  • readIdColumn: Nama kolom BigQuery yang menyimpan ID unik baris.
  • bigtableWriteInstanceId: ID instance Bigtable yang berisi tabel.
  • bigtableWriteTableId: ID tabel Bigtable yang akan ditulis.
  • bigtableWriteColumnFamily: Nama grup kolom tabel Bigtable tempat data akan ditulis.

Parameter opsional

  • inputTableSpec: Tabel BigQuery yang akan dibaca. Jika Anda menentukan inputTableSpec, template akan membaca data langsung dari penyimpanan BigQuery menggunakan BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage). Untuk mengetahui informasi tentang batasan di Storage Read API, lihat https://cloud.google.com/bigquery/docs/reference/storage#limitations. Anda harus menentukan inputTableSpec atau query. Jika Anda menetapkan kedua parameter, template akan menggunakan parameter query. Misalnya, <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>.
  • outputDeadletterTable: Tabel BigQuery untuk pesan yang gagal mencapai tabel output. Jika tidak ada, tabel akan dibuat selama eksekusi pipeline. Jika tidak ditentukan, <outputTableSpec>_error_records akan digunakan. Misalnya, <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>.
  • kueri: Kueri SQL yang akan digunakan untuk membaca data dari BigQuery. Jika set data BigQuery berada dalam project yang berbeda dengan tugas Dataflow, tentukan nama set data lengkap dalam kueri SQL, misalnya: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Secara default, parameter query menggunakan GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), kecuali jika useLegacySql adalah true. Anda harus menentukan inputTableSpec atau query. Jika Anda menetapkan kedua parameter, template akan menggunakan parameter query. Misalnya, select * from sampledb.sample_table.
  • useLegacySql: Tetapkan ke true untuk menggunakan SQL lama. Parameter ini hanya berlaku saat menggunakan parameter query. Setelan defaultnya adalah false.
  • queryLocation: Diperlukan saat membaca dari tampilan yang diotorisasi tanpa izin tabel yang mendasarinya. Contoh, US.
  • queryTempDataset: Dengan opsi ini, Anda dapat menetapkan set data yang ada untuk membuat tabel sementara guna menyimpan hasil kueri. Contoh, temp_dataset.
  • KMSEncryptionKey: Jika membaca dari BigQuery menggunakan sumber kueri, gunakan kunci Cloud KMS ini untuk mengenkripsi tabel sementara yang dibuat. Contoh, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • bigtableRpcAttemptTimeoutMs: Waktu tunggu untuk setiap upaya RPC Bigtable dalam milidetik.
  • bigtableRpcTimeoutMs: Total waktu tunggu untuk operasi RPC Bigtable dalam milidetik.
  • bigtableAdditionalRetryCodes: Kode percobaan ulang tambahan. Contoh, RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED.
  • bigtableWriteAppProfile: ID profil aplikasi Bigtable yang akan digunakan untuk ekspor. Jika Anda tidak menentukan profil aplikasi, Bigtable akan menggunakan profil aplikasi default (https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile) dari instance.
  • bigtableWriteProjectId: ID project Google Cloud yang berisi instance Bigtable tempat data akan ditulis.
  • bigtableBulkWriteLatencyTargetMs: Target latensi Bigtable dalam milidetik untuk throttling berbasis latensi.
  • bigtableBulkWriteMaxRowKeyCount: Jumlah maksimum kunci baris dalam operasi tulis batch Bigtable.
  • bigtableBulkWriteMaxRequestSizeBytes: Byte maksimum yang akan disertakan per operasi tulis batch Bigtable.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the BigQuery to Bigtable template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Bigtable \
    --parameters \
readIdColumn=READ_COLUMN_ID,\
inputTableSpec=INPUT_TABLE_SPEC,\
bigtableWriteInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableWriteTableId=BIGTABLE_TABLE_ID,\
bigtableWriteColumnFamily=BIGTABLE_COLUMN_FAMILY

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • READ_COLUMN_ID: kolom ID unik BigQuery Anda.
  • INPUT_TABLE_SPEC: nama tabel BigQuery Anda.
  • BIGTABLE_INSTANCE_ID: ID instance Bigtable Anda.
  • BIGTABLE_TABLE_ID: ID tabel Bigtable Anda.
  • BIGTABLE_COLUMN_FAMILY: keluarga kolom tabel Bigtable Anda.

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readIdColumn": "READ_COLUMN_ID",
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "bigtableWriteInstanceId": "BIGTABLE_INSTANCE_ID",
          "bigtableWriteTableId": "BIGTABLE_TABLE_ID",
          "bigtableWriteColumnFamily": "BIGTABLE_COLUMN_FAMILY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Bigtable",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • READ_COLUMN_ID: kolom ID unik BigQuery Anda.
  • INPUT_TABLE_SPEC: nama tabel BigQuery Anda.
  • BIGTABLE_INSTANCE_ID: ID instance Bigtable Anda.
  • BIGTABLE_TABLE_ID: ID tabel Bigtable Anda.
  • BIGTABLE_COLUMN_FAMILY: keluarga kolom tabel Bigtable Anda.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.bigtable.utils.BigtableConfig.generateCloudBigtableWriteConfiguration;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.transforms.BigtableConverters;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToBigtable.BigQueryToBigtableOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.hadoop.hbase.client.Mutation;

/**
 * Dataflow template which reads BigQuery data and writes it to Bigtable. The source data can be
 * either a BigQuery table or an SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to Bigtable",
    description = "A pipeline to export a BigQuery table into Bigtable.",
    optionsClass = BigQueryToBigtableOptions.class,
    optionsOrder = {
      BigQueryToBigtableOptions.class,
      BigQueryConverters.BigQueryReadOptions.class,
      BigtableCommonOptions.class,
      BigtableCommonOptions.WriteOptions.class
    },
    optionalOptions = {"inputTableSpec"},
    flexContainerName = "bigquery-to-bigtable",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source BigQuery table must exist.",
      "The Bigtable table must exist.",
      "The <a href=\"https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker-service-account\">worker service account</a>"
          + " needs the <code>roles/bigquery.datasets.create</code> permission. For"
          + " more information, see <a href=\"https://cloud.google.com/bigquery/docs/access-control\">Introduction to IAM</a>."
    })
public class BigQueryToBigtable {

  /**
   * The {@link BigQueryToBigtableOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToBigtableOptions
      extends BigQueryConverters.BigQueryReadOptions,
          BigtableCommonOptions.WriteOptions,
          GcpOptions {

    @TemplateParameter.Text(
        order = 1,
        regexes = {"[A-Za-z_][A-Za-z_0-9]*"},
        description = "Unique identifier column",
        helpText = "The name of the BigQuery column storing the unique identifier of the row.")
    @Required
    String getReadIdColumn();

    void setReadIdColumn(String value);
  }

  /**
   * Runs a pipeline which reads data from BigQuery and writes it to Bigtable.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToBigtableOptions.class);

    CloudBigtableTableConfiguration bigtableTableConfig =
        generateCloudBigtableWriteConfiguration(options);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            "AvroToMutation",
            BigQueryConverters.ReadBigQuery.<Mutation>newBuilder()
                .setOptions(options.as(BigQueryToBigtableOptions.class))
                .setReadFunction(
                    BigQueryIO.read(
                        BigtableConverters.AvroToMutation.newBuilder()
                            .setColumnFamily(options.getBigtableWriteColumnFamily())
                            .setRowkey(options.getReadIdColumn())
                            .build()))
                .build())
        .apply("WriteToTable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    pipeline.run();
  }
}

Langkah berikutnya