Cloud Bigtable 用の Dataflow コネクタ

Cloud Bigtable 用の Cloud Dataflow コネクタにより、Cloud Dataflow パイプライン内で Cloud Bigtable を使用できるようになります。このコネクタは、一括オペレーションとストリーミング オペレーションの両方に使用できます。

このコネクタは Java で書かれており、Java 用 Cloud Bigtable HBase クライアント上でビルドされます。これは、Apache Beam ベースの Dataflow SDK 2.x for Java と互換性があります。コネクタのソースコードは、GitHub の GoogleCloudPlatform/cloud-bigtable-client リポジトリにあります。

このページでは、Cloud Dataflow コネクタで Read および Write 変換を行う方法を概説します。Cloud Dataflow コネクタの完全な API ドキュメントもご覧ください。

Maven プロジェクトへのコネクタの追加

Cloud Dataflow コネクタを Maven プロジェクトに追加するには、Maven アーティファクトを pom.xml ファイルに依存ファイルとして追加します。

<dependency>
    <groupId>com.google.cloud.bigtable</groupId>
    <artifactId>bigtable-hbase-beam</artifactId>
    <version>1.11.0</version>
</dependency>

Cloud Bigtable 構成の指定

Cloud Bigtable に対して読み書きを実行する際には、CloudBigtableScanConfiguration 構成オブジェクトを指定する必要があります。このオブジェクトには、使用するテーブルのプロジェクト ID とインスタンス ID、およびテーブル自体の名前が指定されます。

CloudBigtableTableConfiguration config =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .build();

オプションで、Apache HBase Scan オブジェクトを指定することにより、読み取り結果の制限やフィルタの適用もできます。詳細は、Cloud Bigtable からの読み取りをご覧ください。

Cloud Bigtable からの読み取り

Cloud Bigtable テーブルからの読み取りを行うには、CloudBigtableIO.read オペレーションの結果に Read 変換を適用します。Read 変換は HBase Result オブジェクトからなる PCollection を返します。PCollection の各要素はテーブル内の 1 つの行を表します。

デフォルトでは、CloudBigtableIO.read オペレーションはテーブルのすべての行を返します。HBase Scan オブジェクトを使用すると、読み取りの対象をテーブル内の特定範囲の行キーに制限することや、読み取り結果へのフィルタの適用ができます。Scan オブジェクトを使用するには、このオブジェクトを CloudBigtableScanConfiguration に含めます。

たとえば、テーブルの各行から最初の Key-Value ペアのみを返す Scan を追加できます。これは、テーブル内の行数をカウントする際に役立ちます。

Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());

// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(Count.<Result>globally())
    .apply(ParDo.of(stringifier))
    .apply(TextIO.write().to(options.getResultLocation()));

Cloud Bigtable への書き込み

Cloud Bigtable テーブルに書き込むには、CloudBigtableIO.writeToTable オペレーションを apply します。このオペレーションを HBase Mutation オブジェクトの PCollection に対して実行する必要があります。このコレクションには、Put オブジェクトと Delete オブジェクトを含めることができます。

Cloud Bigtable テーブルがすでに存在し、適切な列ファミリーが定義されている必要があります。Dataflow コネクタによってテーブルや列ファミリーが動的に作成されることはありません。テーブルを作成して列ファミリーを設定するには、cbt コマンドライン ツールを使用できます。また、プログラムによって行うこともできます。

Cloud Bigtable に書き込むには、事前に、Cloud Dataflow パイプラインを作成して、ネットワーク経由の put オペレーションと delete オペレーションがシリアライズされるようにしておく必要があります。

CloudBigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般に、出力データを HBase Put または HBase Delete オブジェクトのコレクションとしてフォーマットするには、ParDo などの変換を実行する必要があります。以下に簡単な DoFn 変換の例を示します。この例では、現在値を受け取って、それを Put の行キーとして使用しています。

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @ProcessElement
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

これで、Put オブジェクトを Cloud Bigtable に書き込むことができます。

p
    .apply(Create.of("Hello", "World"))
    .apply(ParDo.of(MUTATION_TRANSFORM))
    .apply(CloudBigtableIO.writeToTable(config));
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Bigtable ドキュメント