Bigtable HBase Beam 连接器

为了帮助您在 Dataflow 流水线中使用 Bigtable,我们提供了两个开源 Bigtable Beam I/O 连接器。

如果您要从 HBase 迁移到 Bigtable,或者您的应用调用 HBase API,请使用本页面上介绍的 Bigtable HBase Beam 连接器 (CloudBigtableIO)。

在所有其他情况下,您应该将 Bigtable Beam 连接器 (BigtableIO) 与 Java 版 Cloud Bigtable 客户端结合使用,后者使用 Cloud Bigtable API。如需开始使用该连接器,请参阅 Bigtable Beam 连接器

如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档

开始使用 HBase

Bigtable HBase Beam 连接器是使用 Java 编写的,并以 Java 版 Bigtable HBase 客户端为基础构建。它与 Java 版 Dataflow SDK 2.x(基于 Apache Beam)兼容。此连接器的源代码位于 GitHub 的 googleapis/java-bigtable-hbase 代码库中。

本页面简要介绍如何使用 ReadWrite 转换。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment

如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud

将连接器添加到 Maven 项目

如需将 Bigtable HBase Beam 连接器添加到 Maven 项目,请将 Maven 工件作为依赖项添加到 pom.xml 文件中:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

指定 Bigtable 配置

创建选项界面以允许用于运行流水线的输入:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

对 Bigtable 执行读取或写入操作时,您必须提供一个 CloudBigtableConfiguration 配置对象。该对象指定了表所对应的项目 ID 和实例 ID,以及表本身的名称:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

对于读取,请提供 CloudBigtableScanConfiguration 配置对象,该对象可让您指定用于限制和过滤读取结果的 Apache HBase Scan 对象。如需了解详情,请参阅从 Bigtable 读取数据

从 Bigtable 中读取

如需从 Bigtable 表中读取数据,请将 Read 转换应用于 CloudBigtableIO.read 操作的结果。Read 转换会返回由 HBase Result 对象组成的 PCollection,其中 PCollection 中的每个元素均表示表中的一行。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

默认情况下,CloudBigtableIO.read 操作会返回表中的所有行。您可以使用 HBase Scan 对象来仅允许读取表格内某一特定范围的行键,或者对读取结果应用过滤条件。如需使用 Scan 对象,请将其添加到您的 CloudBigtableScanConfiguration 中。

例如,您可以添加一个 Scan,以便仅返回表中每行的第一个键值对;在计算表中的行数时,这种做法会很实用:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

写入 Bigtable

如需向 Bigtable 表中写入数据,请执行 apply CloudBigtableIO.writeToTable 操作。您需要对由 HBase Mutation 对象(可以包括 PutDelete 对象)组成的 PCollection 执行此操作。

Bigtable 表必须是已经存在的现有表,并且必须定义了适当的列族。Dataflow 连接器不会即时创建表和列族。您可以使用 cbt CLI 创建表并设置列族,也可以通过编程方式执行此操作。

在向 Bigtable 写入数据之前,您必须先创建 Dataflow 流水线,以便通过网络将 put 和 delete 操作序列化:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般来说,您需要执行转换(如 ParDo),以将输出数据的格式设置为 HBase PutDelete 对象的集合。以下示例展示了一个 DoFn 转换,该转换接受当前值并将其用作 Put 的行键。然后,您可以将 Put 对象写入 Bigtable 中。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

如需启用批量写入流控制,请将 BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL 设置为 true。此功能会自动限制批量写入请求的流量,并让 Bigtable 自动扩缩可以自动添加或移除节点来处理您的 Dataflow 作业。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

下面是一个完整的写入示例,包括用于启用批量写入流控制的变体。


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}