Bigtable HBase Beam コネクタ

Dataflow パイプラインで Bigtable を使用する際には、2 つのオープンソースの Bigtable Beam I/O コネクタを利用できます。

HBase から Bigtable に移行する場合や、アプリケーションで HBase API を呼び出す場合は、このページで説明する Bigtable HBase Beam コネクタ(CloudBigtableIO)を使用します。

それ以外の場合は、Bigtable Beam コネクタ(BigtableIO)を、Cloud Bigtable API と連携する Java 用 Cloud Bigtable クライアントと組み合わせて使用する必要があります。このコネクタの使用を開始するには、Bigtable Beam コネクタをご覧ください。

Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。

HBase を使ってみる

Bigtable HBase Beam コネクタは Java で記述されていて、Java 用 Bigtable HBase クライアント上でビルドされます。これは、Apache Beam ベースの Dataflow SDK 2.x for Java と互換性があります。コネクタのソースコードは、GitHub の googleapis/java-bigtable-hbase リポジトリにあります。

このページでは、Read 変換と Write 変換の使用方法の概要について説明します。

認証の設定

このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。

  1. Google Cloud CLI をインストールします。
  2. gcloud CLI を初期化するには:

    gcloud init
  3. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

詳細については、 ローカル開発環境の認証の設定 をご覧ください。

本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。

Maven プロジェクトへのコネクタの追加

Bigtable HBase Beam コネクタを Maven プロジェクトに追加するには、Maven アーティファクトを pom.xml ファイルに依存ファイルとして追加します。

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Bigtable 構成の指定

パイプラインを実行するための入力を許可するオプション インターフェースを作成します。

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Bigtable に対して読み書きを実行する際には、CloudBigtableConfiguration 構成オブジェクトを指定する必要があります。このオブジェクトには、使用するテーブルのプロジェクト ID とインスタンス ID、およびテーブル自体の名前が指定されます。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

読み取り用に CloudBigtableScanConfiguration 構成オブジェクトを指定します。これにより、読み取りの結果を制限およびフィルタリングする Apache HBase Scan オブジェクトを指定できます。詳細については、Bigtable からの読み取りをご覧ください。

Bigtable から読み取る

Bigtable テーブルから読み取るには、CloudBigtableIO.read オペレーションの結果に Read 変換を適用します。Read 変換は、HBase Result オブジェクトの PCollection を返します。PCollection 内の各要素はテーブル内の 1 行を表します。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

デフォルトでは、CloudBigtableIO.read オペレーションはテーブル内のすべての行を返します。HBase Scan オブジェクトを使用すると、読み取りの対象をテーブル内の特定範囲の行キーに制限することや、読み取り結果へのフィルタの適用ができます。Scan オブジェクトを使用するには、そのオブジェクトを CloudBigtableScanConfiguration に含めます。

たとえば、テーブルの各行から最初の Key-Value ペアのみを返す Scan を追加できます。これは、テーブル内の行数をカウントする際に役立ちます。

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Bigtable に書き込む

Bigtable テーブルに書き込むには、CloudBigtableIO.writeToTable オペレーションに apply を行います。このオペレーションは、HBase Mutation オブジェクトの PCollection に行う必要があります。ここには、Put オブジェクト Delete オブジェクトを含めることができます。

Bigtable テーブルがすでに存在し、適切な列ファミリーが定義されている必要があります。Dataflow コネクタによってテーブルや列ファミリーが動的に作成されることはありません。テーブルを作成して列ファミリーを設定するには、cbt CLI を使用できます。また、プログラムによって行うこともできます。

Bigtable に書き込むには、事前に、Dataflow パイプラインを作成して、ネットワーク経由の put オペレーションと delete オペレーションがシリアライズされるようにしておく必要があります。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般に、出力データを HBase Put または HBase Delete オブジェクトのコレクションとしてフォーマットするには、ParDo などの変換を実行する必要があります。以下に簡単な DoFn 変換の例を示します。この例では、現在値を受け取って、それを Put の行キーとして使用しています。これで、Put オブジェクトを Bigtable に書き込むことができます。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

バッチ書き込みのフロー制御を有効にするには、BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROLtrue に設定します。この機能により、バッチ書き込みリクエストのトラフィックは自動的にレート制限されます。Bigtable は、Dataflow ジョブを処理する際に自動スケーリングでノードを自動的に追加または削除します。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

以下に、完全な書き込みの例を示します。ここには、バッチ書き込みのフロー制御を有効にするバリエーションも含まれています。


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}