Cloud BigTable 的 Dataflow 連接器

Cloud BigTable 的 Cloud Dataflow 連接器可讓您在 Cloud Dataflow 管道中使用 Cloud BigTable。您可以在批次及串流作業中使用該連接器。

連接器是由 Java 語言編寫,並構建在適用 Java 的 Cloud Bigtable HBase 用戶端上。此與基於 Apache Beam 的 Java Dataflow SDK 2.x 相容。連接器的原始碼可於 GoogleCloudPlatform/cloud-bigtable-client 存放區內 GitHub 找到。

此頁面概述了如何使用 ReadWrite 來與 Cloud Dataflow 連接器轉換。您也可以參閱 Cloud Dataflow 連接器的完整 API 說明文件

新增連接器至 Maven 專案

若要新增 Cloud Dataflow 連接器至 Maven 專案,請新增 Mavan 成果至您的 pom.xml 檔案作為相依項目:

<dependency>
    <groupId>com.google.cloud.bigtable</groupId>
    <artifactId>bigtable-hbase-beam</artifactId>
    <version>1.5.0</version>
</dependency>

指定 Cloud Bigtable 的設置

當您讀取或寫入於 Cloud Bigtable 時,您必須提供 CloudBigtableScanConfiguration 配置物件。此物件指定您資料表的專案 ID 與執行個體 ID,以及資料表名稱本身:

CloudBigtableTableConfiguration config =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .build();

或者,您也可以提供會限制及篩選讀取結果的 Apache HBase Scan 物件。詳細資訊請參閱從 Cloud Bigtable 讀取

從 Cloud Bigtable 讀取

如要讀取 Cloud BigTable 資料表,請將 Read 轉換套用至 CloudBigtableIO.read 作業的結果。Read 轉換會傳回 HBase Result 物件的 PCollection,其中 PCollection 的每個元素均代表資料表中的一個資料列。

根據預設,CloudBigtableIO.read 作業會傳回資料表中所有的資料列。您可以使用 HBase Scan 物件,將讀取作業限制在資料表中特定的資料列索引鍵範圍,或是將篩選器套用至讀取作業的結果。若要使用 Scan 物件,請將其納入您的 CloudBigtableScanConfiguration

例如,若您要新增一個 Scan,只傳回資料表中每一行第一個鍵值組合,以便在資料表中計算行數:

Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());

// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(Count.<Result>globally())
    .apply(ParDo.of(stringifier))
    .apply(TextIO.write().to(options.getResultLocation()));

寫入 Cloud Bigtable

如要寫入 Cloud Bigtable 資料表,請 apply 一個 CloudBigtableIO.writeToTable 作業。您必須在 HBase Mutation 物件 (可包括 PutDelete 物件) 的 PCollection 上執行這項作業。

Cloud Bigtable 必須已存在且已定義合適的資料欄系列。Dataflow 連接器不能即時建立資料表與資料欄系列。您可以使用 cbt 指令列工具來建立資料表及設定資料欄系列,也可以透過程式來執行此作業。

在寫入 Cloud BigTable 之前,您必須先建立 Cloud Dataflow 管道,以便能透過網路將放置和刪除作業序列化:

CloudBigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
Pipeline p = Pipeline.create(options);

在一般情況下,您需要執行轉換,例如 ParDo,用來格式化您的輸出資料至 HBase 集合的 PutDelete 物件中。以下的範例說明一個簡單的 DoFn 轉換,可獲取當前值並將其用作 Put 的資料列索引鍵:

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @ProcessElement
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

您可以寫入 Put 物件至 Cloud Bigtable:

p
    .apply(Create.of("Hello", "World"))
    .apply(ParDo.of(MUTATION_TRANSFORM))
    .apply(CloudBigtableIO.writeToTable(config));
本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
Cloud Bigtable 說明文件