Bigtable HBase Beam コネクタ

Dataflow パイプラインで Bigtable を使用する際には、2 つのオープンソースの Bigtable Beam I/O コネクタを利用できます。

HBase から Bigtable に移行する場合や、アプリケーションが HBase API を呼び出す場合は、このページで説明する Bigtable HBase Beam コネクタ(CloudBigtableIO)を使用してください。

それ以外の場合は、Bigtable Beam コネクタ(BigtableIO)を、Cloud Bigtable API と連携する Java 用 Cloud Bigtable クライアントと組み合わせて使用する必要があります。このコネクタの使用を開始するには、Bigtable Beam コネクタをご覧ください。

Apache Beam プログラミング モデルの詳細については、Beam のドキュメントをご覧ください。

HBase を使ってみる

Bigtable HBase Beam コネクタは Java で記述されており、Java 用 Bigtable HBase クライアント上にビルドされています。これは、Apache Beam ベースの Dataflow SDK 2.x for Java と互換性があります。コネクタのソースコードは、GitHub の googleapis/java-bigtable-hbase リポジトリにあります。

このページでは、Read 変換と Write 変換の使用方法の概要について説明します。

認証の設定

このページの Java サンプルをローカル開発環境から使用するには、gcloud CLI をインストールして初期化し、自身のユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定してください。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

詳細については、 ローカル開発環境の認証の設定 をご覧ください。

本番環境の認証の設定については、 Google Cloud で実行するコードのためのアプリケーションのデフォルト認証情報を設定するをご覧ください。

Maven プロジェクトへのコネクタの追加

Bigtable HBase Beam コネクタを Maven プロジェクトに追加するには、Maven アーティファクトを pom.xml ファイルに依存関係として追加します。

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Bigtable の構成を指定する

パイプラインを実行するための入力を許可するオプション インターフェースを作成します。

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Bigtable に対して読み書きを実行する際には、CloudBigtableConfiguration 構成オブジェクトを指定する必要があります。このオブジェクトには、使用するテーブルのプロジェクト ID とインスタンス ID、およびテーブル自体の名前が指定されます。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

読み取り用に CloudBigtableScanConfiguration 構成オブジェクトを指定します。これにより、読み取りの結果を制限およびフィルタリングする Apache HBase Scan オブジェクトを指定できます。詳細については、Bigtable からの読み取りをご覧ください。

Bigtable から読み取る

Bigtable テーブルから読み取るには、CloudBigtableIO.read オペレーションの結果に Read 変換を適用します。Read 変換は、HBase Result オブジェクトの PCollection を返します。PCollection 内の各要素はテーブル内の 1 行を表します。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

デフォルトでは、CloudBigtableIO.read オペレーションはテーブル内のすべての行を返します。HBase Scan オブジェクトを使用すると、読み取りの対象をテーブル内の特定範囲の行キーに制限することや、読み取り結果へのフィルタの適用ができます。Scan オブジェクトを使用するには、そのオブジェクトを CloudBigtableScanConfiguration に含めます。

たとえば、テーブルの各行から最初の Key-Value ペアのみを返す Scan を追加できます。これは、テーブル内の行数をカウントする際に役立ちます。

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Bigtable に書き込む

Bigtable テーブルに書き込むには、CloudBigtableIO.writeToTable オペレーションに apply を行います。このオペレーションは、HBase Mutation オブジェクトの PCollection に行う必要があります。ここには、Put オブジェクト Delete オブジェクトを含めることができます。

Bigtable テーブルがすでに存在し、適切な列ファミリーが定義されている必要があります。Dataflow コネクタによって、テーブルや列ファミリーが動的に作成されることはありません。テーブルを作成して列ファミリーを設定するには、cbt CLI を使用できます。また、プログラムによって行うこともできます。

Bigtable に書き込むには、事前に、Dataflow パイプラインを作成して、ネットワーク経由の put オペレーションと delete オペレーションがシリアライズされるようにしておく必要があります。

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般に、出力データを HBase Put または HBase Delete オブジェクトのコレクションとしてフォーマットするには、ParDo などの変換を実行する必要があります。以下に簡単な DoFn 変換の例を示します。この例では、現在値を受け取って、それを Put の行キーとして使用しています。これで、Put オブジェクトを Bigtable に書き込むことができます。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

バッチ書き込みのフロー制御を有効にするには、BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROLtrue に設定します。この機能により、バッチ書き込みリクエストのトラフィックは自動的にレート制限されます。Bigtable は、Dataflow ジョブを処理する際に自動スケーリングでノードを自動的に追加または削除します。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

以下に、完全な書き込みの例を示します。ここには、バッチ書き込みのフロー制御を有効にするバリエーションも含まれています。


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}