Bigtable HBase Beam 连接器

为了帮助您在 Dataflow 流水线中使用 Bigtable,我们提供了两个开源 Bigtable Beam I/O 连接器。

如果您要从 HBase 迁移到 Bigtable,或者您的应用调用 HBase API,请使用本页面上介绍的 Bigtable HBase Beam 连接器 (CloudBigtableIO)。

在所有其他情况下,您应该将 Bigtable Beam 连接器 (BigtableIO) 与 Java 版 Cloud Bigtable 客户端结合使用,后者使用 Cloud Bigtable API。如需开始使用该连接器,请参阅 Bigtable Beam 连接器

如需详细了解 Apache Beam 编程模型,请参阅 Beam 文档

开始使用 HBase

Bigtable HBase Beam 连接器是使用 Java 编写的,并以 Java 版 Bigtable HBase 客户端为基础构建。它与 Java 版 Dataflow SDK 2.x(基于 Apache Beam)兼容。此连接器的源代码位于 GitHub 的 googleapis/java-bigtable-hbase 代码库中。

本页面简要介绍如何使用 ReadWrite 转换。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment。 如需了解详情,请参阅身份验证文档中的为本地开发环境设置 ADC

如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud

将连接器添加到 Maven 项目

如需将 Bigtable HBase Beam 连接器添加到 Maven 项目,请将 Maven 工件作为依赖项添加到 pom.xml 文件中:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

指定 Bigtable 配置

创建选项界面以允许用于运行流水线的输入:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

对 Bigtable 执行读取或写入操作时,您必须提供一个 CloudBigtableConfiguration 配置对象。该对象指定了表所对应的项目 ID 和实例 ID,以及表本身的名称:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

对于读取,请提供 CloudBigtableScanConfiguration 配置对象,该对象可让您指定用于限制和过滤读取结果的 Apache HBase Scan 对象。如需了解详情,请参阅从 Bigtable 读取数据

从 Bigtable 中读取

如需从 Bigtable 表中读取数据,请将 Read 转换应用于 CloudBigtableIO.read 操作的结果。Read 转换会返回由 HBase Result 对象组成的 PCollection,其中 PCollection 中的每个元素均表示表中的一行。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

默认情况下,CloudBigtableIO.read 操作会返回表中的所有行。您可以使用 HBase Scan 对象来仅允许读取表格内某一特定范围的行键,或者对读取结果应用过滤条件。如需使用 Scan 对象,请将其添加到您的 CloudBigtableScanConfiguration 中。

例如,您可以添加一个 Scan,以便仅返回表中每行的第一个键值对;在计算表中的行数时,这种做法会很实用:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

写入 Bigtable

如需向 Bigtable 表中写入数据,请执行 apply CloudBigtableIO.writeToTable 操作。您需要对由 HBase Mutation 对象(可以包括 PutDelete 对象)组成的 PCollection 执行此操作。

Bigtable 表必须是已经存在的现有表,并且必须定义了适当的列族。Dataflow 连接器不会即时创建表和列族。您可以使用 cbt CLI 创建表并设置列族,也可以通过编程方式执行此操作。

在向 Bigtable 写入数据之前,您必须先创建 Dataflow 流水线,以便通过网络将 put 和 delete 操作序列化:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

一般来说,您需要执行转换(如 ParDo),以将输出数据的格式设置为 HBase PutDelete 对象的集合。以下示例展示了一个 DoFn 转换,该转换接受当前值并将其用作 Put 的行键。然后,您可以将 Put 对象写入 Bigtable 中。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

如需启用批量写入流控制,请将 BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL 设置为 true。此功能会自动限制批量写入请求的流量,并让 Bigtable 自动扩缩可以自动添加或移除节点来处理您的 Dataflow 作业。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

下面是一个完整的写入示例,包括用于启用批量写入流控制的变体。


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}