Dataflow Connector for Cloud Bigtable

The Cloud Dataflow connector for Cloud Bigtable makes it possible to use Cloud Bigtable in a Cloud Dataflow pipeline. You can use the connector for both batch and streaming operations.

The connector is written in Java and is built on the Cloud Bigtable HBase client for Java. You can find the connector on GitHub in the repository GoogleCloudPlatform/cloud-bigtable-client.

This page provides an overview of how to use Read and Write transforms with the Cloud Dataflow connector. You can also read full API documentation for the Cloud Dataflow connector.

Adding the connector to a Maven project

To add the Cloud Dataflow connector to a Maven project, add the Maven artifact to your pom.xml file as a dependency:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-dataflow</artifactId>
  <version>0.9.4</version>
</dependency>

Specifying the Cloud Bigtable configuration

When you read from or write to Cloud Bigtable, you must provide a CloudBigtableScanConfiguration configuration object. This object specifies the project ID and instance ID for your table, as well as the name of the table itself:

CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
    .withProjectId("project-id")
    .withInstanceId("instance-id")
    .withTableId("table")
    .build();

Optionally, you can provide an Apache HBase Scan object that allows you to limit and filter the results of a read. See "Reading from Cloud Bigtable" for details.

Reading from Cloud Bigtable

To read from a Cloud Bigtable table, you apply a Read transform to the result of a CloudBigtableIO.read operation. The Read transform returns a PCollection of HBase Result objects, where each element in the PCollection represents a single row in the table:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)));

By default, a CloudBigtableIO.read operation returns all of the rows in your table. You can use an HBase Scan object to limit the read to a range of row keys within your table, or to apply filters to the results of the read. To use a Scan object, include it in your CloudBigtableScanConfiguration.

For example, to add a Scan that returns only the first key-value pair from each row in your table, which is useful when counting the number of rows in the table:

Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());

CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
    .withProjectId("project-id")
    .withInstanceId("instance-id")
    .withTableId("table")
    .withScan(scan)
    .build();

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)));

Writing to Cloud Bigtable

To write to a Cloud Bigtable table, you apply a CloudBigtableIO.writeToTable operation. You'll need to perform this operation on a PCollection of HBase Mutations, which can include Put and Delete objects. Appends and increments are not suitable for retriable batch programming models, including Hadoop and Cloud Dataflow, and are therefore not supported.

Before you write to Cloud Bigtable, you must create and initialize your Cloud Dataflow pipeline so that puts and deletes can be serialized over the network:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
CloudBigtableIO.initializeForWrite(p);

In general, you'll need to perform a transform, such as a ParDo, to format your output data into a collection of HBase Put or Delete objects. The following example shows a simple DoFn transform that takes the current value and uses it as the row key for a Put:

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @Override
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

You can then write the Puts to Cloud Bigtable:

p
   .apply(Create.of("Hello", "World"))
   .apply(ParDo.of(MUTATION_TRANSFORM))
   .apply(CloudBigtableIO.writeToTable(config));

Send feedback about...

Cloud Bigtable Documentation