Template Bigtable ke Parquet Cloud Storage

Template Bigtable ke Cloud Storage Parquet adalah pipeline yang membaca data dari tabel Bigtable dan menulisnya ke bucket Cloud Storage dalam format Parquet. Anda dapat menggunakan template untuk memindahkan data dari Bigtable ke Cloud Storage.

Persyaratan pipeline

  • Tabel Bigtable harus ada.
  • Bucket Cloud Storage output harus ada sebelum menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • bigtableProjectId: ID project Google Cloud yang berisi instance Cloud Bigtable yang datanya ingin Anda baca.
  • bigtableInstanceId: ID instance Cloud Bigtable yang berisi tabel.
  • bigtableTableId: ID tabel Cloud Bigtable yang akan diekspor.
  • outputDirectory: Awalan jalur dan nama file untuk menulis file output. Harus diakhiri dengan garis miring. Pemformatan DateTime digunakan untuk mengurai jalur direktori untuk pemformat tanggal dan waktu. Contoh: gs://your-bucket/your-path.
  • filenamePrefix: Awalan nama file Parquet. Contoh, table1-. Default-nya adalah: part.

Parameter opsional

  • numShards: Jumlah maksimum shard output yang dihasilkan saat menulis. Jumlah shard yang lebih tinggi berarti throughput yang lebih tinggi untuk menulis ke Cloud Storage, tetapi berpotensi meningkatkan biaya agregasi data di seluruh shard saat memproses file Cloud Storage output. Nilai defaultnya ditentukan oleh Dataflow.
  • bigtableAppProfileId: ID profil aplikasi Bigtable yang akan digunakan untuk ekspor. Jika Anda tidak menentukan profil aplikasi, Bigtable akan menggunakan profil aplikasi default instance: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Bigtable to Parquet Files on Cloud Storage template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Parquet \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX,\
numShards=NUM_SHARDS

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: ID Google Cloud project instance Bigtable tempat Anda ingin membaca data
  • INSTANCE_ID: ID instance Bigtable yang berisi tabel
  • TABLE_ID: ID tabel Bigtable yang akan diekspor
  • OUTPUT_DIRECTORY: jalur Cloud Storage tempat data ditulis, misalnya, gs://mybucket/somefolder
  • FILENAME_PREFIX: awalan nama file Parquet, misalnya, output-
  • NUM_SHARDS: jumlah file Parquet yang akan di-output, misalnya, 1

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
       "numShards": "NUM_SHARDS"
   },
   "environment": { "zone": "us-central1-f" }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: ID Google Cloud project instance Bigtable tempat Anda ingin membaca data
  • INSTANCE_ID: ID instance Bigtable yang berisi tabel
  • TABLE_ID: ID tabel Bigtable yang akan diekspor
  • OUTPUT_DIRECTORY: jalur Cloud Storage tempat data ditulis, misalnya, gs://mybucket/somefolder
  • FILENAME_PREFIX: awalan nama file Parquet, misalnya, output-
  • NUM_SHARDS: jumlah file Parquet yang akan di-output, misalnya, 1
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
 * Currently, filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Parquet.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_GCS_Parquet",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
    description =
        "The Bigtable to Cloud Storage Parquet template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Parquet format. "
            + "You can use the template to move data from Bigtable to Cloud Storage.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-parquet",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToParquet {

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {

    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID of the Google Cloud project that contains the Cloud Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Cloud Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Cloud Bigtable table to export.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse the directory path for date and time formatters. For example: `gs://your-bucket/your-path`.")
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Parquet file prefix",
        helpText =
            "The prefix of the Parquet file name. For example, `table1-`. Defaults to: `part`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Integer(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Maximum output shards",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.")
    @Default.Integer(0)
    ValueProvider<Integer> getNumShards();

    @SuppressWarnings("unused")
    void setNumShards(ValueProvider<Integer> numShards);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Source",
        optional = true,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Application profile ID",
        helpText =
            "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> appProfileId);
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
   *
   * @param options arguments to the pipeline
   */
  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    /**
     * Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
     * GenericRecord(s) to GCS in parquet format.
     */
    FileIO.Write<Void, GenericRecord> write =
        FileIO.<GenericRecord>write()
            .via(ParquetIO.sink(BigtableRow.getClassSchema()))
            .to(options.getOutputDirectory())
            .withPrefix(options.getFilenamePrefix())
            .withSuffix(".parquet");
    ValueProvider<Integer> numShardsOpt = options.getNumShards();
    if (numShardsOpt.isAccessible()) {
      Integer numShards = numShardsOpt.get();
      if (numShards != null && numShards > 0) {
        write = write.withNumShards(options.getNumShards());
      }
    }
    pipeline
        .apply("Read from Bigtable", read)
        .apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
        .setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
        .apply("Write to Parquet in GCS", write);

    return pipeline.run();
  }

  /**
   * Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
   * GenericRecord}.
   */
  static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
    @Override
    public GenericRecord apply(Row row) {
      ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
      List<BigtableCell> cells = new ArrayList<>();
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
          for (Cell cell : column.getCellsList()) {
            long timestamp = cell.getTimestampMicros();
            ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
            cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
          }
        }
      }
      return new GenericRecordBuilder(BigtableRow.getClassSchema())
          .set("key", key)
          .set("cells", cells)
          .build();
    }
  }
}

Langkah berikutnya