Konektor Beam Bigtable HBase

Untuk membantu Anda menggunakan Bigtable dalam pipeline Dataflow, ada dua konektor I/O Bigtable Beam open source.

Jika Anda bermigrasi dari HBase ke Bigtable atau aplikasi Anda memanggil HBase API, gunakan konektor Bigtable HBase Beam (CloudBigtableIO) yang dibahas di halaman ini.

Dalam semua kasus lainnya, Anda harus menggunakan konektor Bigtable Beam (BigtableIO) bersama dengan klien Cloud Bigtable untuk Java, yang berfungsi dengan Cloud Bigtable API. Untuk mulai menggunakan konektor tersebut, lihat Konektor Bigtable Beam.

Untuk informasi selengkapnya tentang model pemrograman Apache Beam, lihat dokumentasi Beam.

Mulai menggunakan HBase

Konektor Bigtable HBase Beam ditulis dalam Java dan dibuat di klien Bigtable HBase untuk Java. Alat ini kompatibel dengan Dataflow SDK 2.x untuk Java, yang didasarkan pada Apache Beam. Kode sumber konektor ada di GitHub di repositori googleapis/java-bigtable-hbase.

Halaman ini memberikan ringkasan tentang cara menggunakan transformasi Read dan Write.

Menyiapkan autentikasi

Untuk menggunakan contoh Java di halaman ini dalam lingkungan pengembangan lokal, instal dan lakukan inisialisasi gcloud CLI, lalu siapkan Kredensial Default Aplikasi dengan kredensial pengguna Anda.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Untuk informasi selengkapnya, lihat Set up authentication for a local development environment.

Untuk informasi tentang cara menyiapkan autentikasi bagi lingkungan produksi, lihat Set up Application Default Credentials for code running on Google Cloud.

Menambahkan konektor ke project Maven

Untuk menambahkan konektor Bigtable HBase Beam ke project Maven, tambahkan artefak Maven ke file pom.xml sebagai dependensi:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Menentukan konfigurasi Bigtable

Buat antarmuka opsi untuk mengizinkan input untuk menjalankan pipeline Anda:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Saat membaca dari atau menulis ke Bigtable, Anda harus menyediakan objek konfigurasi CloudBigtableConfiguration. Objek ini menentukan project ID dan instance ID untuk tabel Anda, serta nama tabel itu sendiri:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Untuk membaca, berikan objek konfigurasi CloudBigtableScanConfiguration, yang memungkinkan Anda menentukan objek Scan Apache HBase yang membatasi dan memfilter hasil pembacaan. Lihat Membaca dari Bigtable untuk mengetahui detailnya.

Membaca dari Bigtable

Untuk membaca dari tabel Bigtable, Anda menerapkan transformasi Read ke hasil operasi CloudBigtableIO.read. Transformasi Read menampilkan PCollection objek Result HBase, dengan setiap elemen dalam PCollection mewakili satu baris dalam tabel.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Secara default, operasi CloudBigtableIO.read menampilkan semua baris dalam tabel Anda. Anda dapat menggunakan objek Scan HBase untuk membatasi pembacaan ke rentang kunci baris dalam tabel, atau menerapkan filter ke hasil pembacaan. Untuk menggunakan objek Scan, sertakan dalam CloudBigtableScanConfiguration.

Misalnya, Anda dapat menambahkan Scan yang hanya menampilkan pasangan nilai kunci pertama dari setiap baris dalam tabel, yang berguna saat menghitung jumlah baris dalam tabel:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Menulis ke Bigtable

Untuk menulis ke tabel Bigtable, Anda apply operasi CloudBigtableIO.writeToTable. Anda harus melakukan operasi ini pada PCollection objek Mutation HBase, yang dapat menyertakan objek Put dan Delete.

Tabel Bigtable harus sudah ada dan harus memiliki grup kolom yang sesuai. Konektor Dataflow tidak membuat tabel dan keluarga kolom secara langsung. Anda dapat menggunakan cbt CLI untuk membuat tabel dan menyiapkan keluarga kolom, atau Anda dapat melakukannya secara terprogram.

Sebelum menulis ke Bigtable, Anda harus membuat pipeline Dataflow sehingga operasi penyisipan dan penghapusan dapat diserialisasi melalui jaringan:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Secara umum, Anda harus melakukan transformasi, seperti ParDo, untuk memformat data output menjadi kumpulan objek Put atau Delete HBase. Contoh berikut menunjukkan transformasi DoFn yang mengambil nilai saat ini dan menggunakannya sebagai kunci baris untuk Put. Kemudian, Anda dapat menulis objek Put ke Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Untuk mengaktifkan kontrol alur tulis batch, tetapkan BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL ke true. Fitur ini secara otomatis membatasi kapasitas traffic untuk permintaan tulis batch dan memungkinkan penskalaan otomatis Bigtable menambahkan atau menghapus node secara otomatis untuk menangani tugas Dataflow Anda.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Berikut adalah contoh penulisan lengkap, termasuk variasi yang memungkinkan kontrol alur penulisan batch.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}