Konektor HBase Beam Bigtable

Untuk membantu Anda menggunakan Bigtable dalam pipeline Dataflow, tersedia dua konektor I/O Bigtable Beam open source.

Jika Anda bermigrasi dari HBase ke Bigtable atau jika aplikasi Anda memanggil HBase API, gunakan konektor Bigtable HBase Beam (CloudBigtableIO) yang dibahas di halaman ini.

Pada kasus lain, Anda harus menggunakan konektor Bigtable Beam (BigtableIO) bersama dengan klien Cloud Bigtable untuk Java yang berfungsi dengan Cloud Bigtable API. Untuk mulai menggunakan konektor tersebut, lihat konektor Bigtable Beam.

Untuk informasi model pemrograman Apache Beam selengkapnya, lihat dokumentasi Beam.

Mulai menggunakan HBase

Konektor Bigtable HBase Beam ditulis dalam Java dan dibuat di klien HBase Bigtable untuk Java. Versi ini kompatibel dengan Dataflow SDK 2.x untuk Java, yang didasarkan pada Apache Beam. Kode sumber konektor ada di GitHub di repositori googleapis/java-bigtable-hbase.

Halaman ini memberikan ringkasan cara menggunakan transformasi Read dan Write.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dari lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Menginstal Google Cloud CLI.
  2. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  3. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login

Untuk informasi selengkapnya, lihat Siapkan autentikasi untuk lingkungan pengembangan lokal.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Siapkan Kredensial Default Aplikasi untuk kode yang berjalan di Google Cloud.

Menambahkan konektor ke project Maven

Untuk menambahkan konektor Bigtable HBase Beam ke project Maven, tambahkan artefak Maven ke file pom.xml Anda sebagai dependensi:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Menentukan konfigurasi Bigtable

Buat antarmuka opsi agar input dapat menjalankan pipeline Anda:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Saat membaca dari atau menulis ke Bigtable, Anda harus menyediakan objek konfigurasi CloudBigtableConfiguration. Objek ini menentukan project ID dan ID instance untuk tabel Anda, serta nama tabel itu sendiri:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Untuk membaca, sediakan objek konfigurasi CloudBigtableScanConfiguration, yang memungkinkan Anda menentukan objek Scan Apache HBase yang membatasi dan memfilter hasil pembacaan. Lihat Membaca dari Bigtable untuk mengetahui detailnya.

Membaca dari Bigtable

Untuk membaca dari tabel Bigtable, Anda akan menerapkan transformasi Read ke hasil operasi CloudBigtableIO.read. Transformasi Read menampilkan PCollection objek Result HBase, dengan setiap elemen dalam PCollection mewakili satu baris dalam tabel.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Secara default, operasi CloudBigtableIO.read menampilkan semua baris dalam tabel Anda. Anda dapat menggunakan objek Scan HBase untuk membatasi pembacaan ke rentang kunci baris dalam tabel Anda, atau untuk menerapkan filter ke hasil pembacaan. Untuk menggunakan objek Scan, sertakan objek tersebut dalam CloudBigtableScanConfiguration Anda.

Misalnya, Anda dapat menambahkan Scan yang hanya menampilkan pasangan nilai kunci pertama dari setiap baris dalam tabel, yang berguna saat menghitung jumlah baris dalam tabel:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Menulis ke Bigtable

Untuk menulis ke tabel Bigtable, Anda harus melakukan apply operasi CloudBigtableIO.writeToTable. Anda harus melakukan operasi ini pada PCollection objek Mutation HBase, yang dapat menyertakan objek Put dan Delete.

Tabel Bigtable harus sudah ada dan harus memiliki kelompok kolom yang sesuai yang telah ditentukan. Konektor Dataflow tidak membuat tabel dan grup kolom dengan cepat. Anda dapat menggunakan CLI cbt untuk membuat tabel dan menyiapkan grup kolom, atau Anda dapat melakukannya secara terprogram.

Sebelum menulis ke Bigtable, Anda harus membuat pipeline Dataflow agar proses pemindahan dan penghapusan dapat diserialisasi melalui jaringan:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Secara umum, Anda harus melakukan transformasi, seperti ParDo, untuk memformat data output ke dalam kumpulan objek Put atau Delete HBase. Contoh berikut menunjukkan transformasi DoFn yang mengambil nilai saat ini dan menggunakannya sebagai kunci baris untuk Put. Selanjutnya, Anda dapat menulis objek Put ke Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Untuk mengaktifkan kontrol alur penulisan batch, tetapkan BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL ke true. Fitur ini secara otomatis membatasi kapasitas traffic untuk permintaan penulisan batch dan memungkinkan penskalaan otomatis Bigtable menambahkan atau menghapus node secara otomatis untuk menangani tugas Dataflow Anda.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Berikut adalah contoh penulisan lengkap, termasuk variasi yang memungkinkan kontrol alur penulisan batch.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}