Bigtable I/O

Dataflow SDK では、Google Cloud Bigtable との間でデータの読み取りと書き込みを行うための API が提供されています。BigtableIO ソースとシンクを使用すると、指定された Bigtable の Bigtable Row オブジェクトの PCollection を読み取りや書き込みを行えます。

Bigtable オプションの設定

Bigtable を対象に読み取りや書き込みを行うには、テーブル ID と一連の Bigtable オプションを指定する必要があります。これらのオプションには、ターゲットの Bigtable クラスタを特定するために必要な、以下の情報が含まれます。

  • プロジェクト ID
  • クラスタ ID
  • ゾーン ID

これらのオプションを指定する最も簡単な方法は、com.google.cloud.bigtable.config.BigtableOptions パッケージの BigtableOptions.Builder を使用して作成することです。

  BigtableOptions.Builder optionsBuilder =
     new BigtableOptions.Builder()
         .setProjectId("project")
         .setClusterId("cluster")
         .setZoneId("zone");

Bigtable からの読み取り

Bigtable から読み取るには、Pipeline オブジェクトに BigtableIO.read() 変換を適用します。.withTableId.withBigtableOptions を使用してテーブル ID と BigtableOptions を指定する必要があります。デフォルトでは、BigtableIO.read() は指定された Bigtable 全体をスキャンし、Bigtable Row オブジェクトの PCollection を返します。

  // Scan the entire table.
  PCollection <Row> btRows = p.apply("read",
      BigtableIO.read()
          .withBigtableOptions(optionsBuilder)
          .withTableId("table"));

指定された Bigtable 内にある行のサブセットをスキャンしたい場合は、Bigtable RowFilter オブジェクトを指定します。RowFilter を指定した場合、BigtableIO.read() はフィルタに一致する Row だけを返します。

  // Read only rows that match the specified filter.

  RowFilter filter = ...;

  PCollection <Row> filteredBtRows = p.apply("filtered read",
      BigtableIO.read()
          .withBigtableOptions(optionsBuilder)
          .withTableId("table")
          .withRowFilter(filter));

Bigtable への書き込み

Bigtable への書き込みを行うには、出力データを含んだ PCollectionBigtableIO.write() 変換を適用します。.withTableId.withBigtableOptions を使用してテーブル ID と BigtableOptions を指定する必要があります。

Bigtable の出力データのフォーマット

BigtableIO データシンクは、各書き込みオペレーションを、ターゲット Bigtable に対する一連の行のミューテーションとして実行します。そのため、ユーザーは出力データを PCollection<KV<ByteString, Iterable<Mutation>>> としてフォーマットする必要があります。PCollection の各要素には、次のものを含める必要があります。

  • ByteString として書き込まれる行のキー
  • 一連のべき等行ミューテーション オペレーションを表す Mutation オブジェクトの Iterable