适用于 Cloud Bigtable 的 Dataflow 连接器

借助适用于 Cloud Bigtable 的 Cloud Dataflow 连接器,您可以在 Cloud Dataflow 流水线中使用 Cloud Bigtable。此连接器既可用于批量操作,也可用于流式操作。

此连接器是用 Java 编写的,并以适用于 Java 的 Cloud Bigtable HBase 客户端为基础而构建。它与 Java 版 Dataflow SDK 2.x(基于 Apache Beam 而构建)兼容。此连接器的源代码位于 GitHub 的代码库 googleapis/java-bigtable-hbase 中。

本页面简要介绍了如何在 Cloud Dataflow 连接器中使用 ReadWrite 转换。您还可以阅读有关 Cloud Dataflow 连接器的完整 API 文档

将连接器添加到 Maven 项目

要将 Cloud Dataflow 连接器添加到 Maven 项目,请将 Maven 软件工件作为依赖项添加到 pom.xml 文件中:

<dependency>
    <groupId>com.google.cloud.bigtable</groupId>
    <artifactId>bigtable-hbase-beam</artifactId>
    <version>1.11.0</version>
</dependency>

指定 Cloud Bigtable 配置

对 Cloud Bigtable 执行读取或写入操作时,您必须提供一个 CloudBigtableScanConfiguration 配置对象。该对象指定了表所对应的项目 ID 和实例 ID,以及表本身的名称:

CloudBigtableTableConfiguration config =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .build();

您也可以视情况选择提供一个 Apache HBase Scan 对象,以对读取结果进行限制和过滤。如需了解详情,请参阅从 Cloud Bigtable 读取内容

从 Cloud Bigtable 读取内容

要从 Cloud Bigtable 表中读取内容,请对 CloudBigtableIO.read 操作的结果应用 Read 转换。Read 转换会返回 HBase Result 对象的 PCollection,其中 PCollection 中的每个元素代表表中的一行。

默认情况下,CloudBigtableIO.read 操作会返回表中的所有行。您可以使用 HBase Scan 对象仅读取表中一定范围的行键,或者对读取结果应用过滤条件。要使用 Scan 对象,请将该对象加入 CloudBigtableScanConfiguration 中。

例如,您可以添加一个 Scan,用于仅返回表中每行的第一个键值对,这在计算表中的行数时很有用:

Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());

// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(Count.<Result>globally())
    .apply(ParDo.of(stringifier))
    .apply(TextIO.write().to(options.getResultLocation()));

向 Cloud Bigtable 写入内容

要向 Cloud Bigtable 表中写入数据,请执行 apply CloudBigtableIO.writeToTable 操作。您需要对 HBase Mutation 对象(可以包括 PutDelete 对象)的 PCollection 执行此操作。

Cloud Bigtable 表必须是已经存在的现有表,并且必须定义了适当的列族。Dataflow 连接器不会即时创建表和列族。您可以使用 cbt 命令行工具创建表格并设置列族,也可以通过编程方式执行此操作。

在向 Cloud Bigtable 写入内容之前,您必须先创建 Cloud Dataflow 流水线,以便通过网络将 put 和 delete 操作序列化:

CloudBigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
Pipeline p = Pipeline.create(options);

通常,您需要执行 ParDo 等转换,以将输出数据的格式设置为 HBase PutDelete 对象的集合。以下示例显示了一个简单 DoFn 转换,该转换获取当前值并使用此值作为 Put 的行键:

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @ProcessElement
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

然后,您可以将 Put 对象写入 Cloud Bigtable 中:

p
    .apply(Create.of("Hello", "World"))
    .apply(ParDo.of(MUTATION_TRANSFORM))
    .apply(CloudBigtableIO.writeToTable(config));