Conector do Dataflow para o Cloud Bigtable

Com o conector do Cloud Dataflow para Cloud Bigtable, é possível usar o Cloud Bigtable em um pipeline do Cloud Dataflow. Use o conector nas operações em lote e de streaming.

Além de ter a linguagem Java, ele foi criado no cliente HBase no Cloud Bigtable para Java. Ele é compatível com o Dataflow SDK 2.x para Java, baseado no Apache Beam. O código-fonte do conector está no repositório googleapis/java-bigtable-hbase do GitHub.

Nesta página, você encontra uma visão geral de como usar as transformações Read e Write com o conector do Cloud Dataflow. Leia também a documentação completa sobre a API do conector do Cloud Dataflow.

Como adicionar o conector a um projeto do Maven

Para adicionar o conector do Cloud Dataflow a um projeto do Maven, adicione o artefato do Maven ao arquivo pom.xml como uma dependência:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>1.16.0</version>
</dependency>

Como especificar a configuração do Cloud Bigtable

Crie uma interface de opções para permitir entradas para executar o pipeline:

public interface BigtableOptions extends DataflowPipelineOptions {
  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("bigtable-table")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Ao ler ou gravar no Cloud Bigtable, é preciso fornecer um objeto de configuração CloudBigtableConfiguration. Esse objeto especifica os códigos do projeto e da instância para a tabela, bem como o próprio nome da tabela:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para ler, forneça um objeto de configuração CloudBigtableScanConfiguration, que permite especificar um objeto Scan do Apache HBase que limita e filtra os resultados de uma leitura. Saiba mais em Como ler do Cloud Bigtable.

Como ler do Cloud Bigtable

Para ler uma tabela do Cloud Bigtable, aplique uma transformação Read ao resultado de uma operação CloudBigtableIO.read. A transformação Read retorna um PCollection de objetos Result do HBase, em que cada elemento no PCollection representa uma única linha na tabela.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Por padrão, uma operação CloudBigtableIO.read retorna todas as linhas na tabela. É possível usar um objeto HBase Scan para limitar a leitura a um intervalo de chaves de linha na tabela ou para aplicar filtros aos resultados da leitura. Para usar um objeto Scan, inclua-o no CloudBigtableScanConfiguration.

Por exemplo, é possível adicionar um Scan que retorna apenas o primeiro par de chave-valor de cada linha na tabela, o que é útil ao contar o número de linhas na tabela:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Como gravar no Cloud Bigtable

Para gravar em uma tabela do Cloud Bigtable, faça o apply de uma operação CloudBigtableIO.writeToTable. Será preciso realizar essa operação em um PCollection de objetos Mutation do HBase, que podem incluir objetos Put e Delete.

É necessário que exista uma tabela do Cloud Bigtable com grupos de colunas apropriados definidos. O conector do Dataflow não cria tabelas e grupos de colunas rapidamente. É possível usar a ferramenta de linha de comando cbt para criar uma tabela e configurar grupos de colunas ou fazer isso de maneira programática.

Antes de realizar uma gravação no Cloud Bigtable, crie o canal do Cloud Dataflow. Assim, é possível serializar os objetos put e delete pela rede:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Em geral, você precisa executar uma transformação, como um ParDo, para formatar os dados de saída em uma coleção de objetos Put ou Delete do HBase. O exemplo a seguir mostra uma transformação DoFn simples, que usa o valor atual como chave de linha para um Put: Em seguida, será possível gravar os objetos Put no Cloud Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Veja o exemplo completo de escrita.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}