Conector HBase Beam do Bigtable

Para ajudar você a usar o Bigtable em um pipeline do Dataflow, estão disponíveis dois conectores de E/S do Beam de código aberto do Bigtable.

Se você estiver migrando do HBase para o Bigtable ou se o aplicativo chamar a API HBase, use o conector do HBase Beam do Bigtable (CloudBigtableIO) discutido nesta página.

Em todos os outros casos, use o conector do Bigtable Beam (BigtableIO) em conjunto com o cliente do Cloud Bigtable para Java, que funciona com as APIs do Cloud Bigtable. Para começar a usar esse conector, consulte Conector Beam do Bigtable.

Para mais informações sobre o modelo de programação do Apache Beam, consulte a documentação do Beam.

Introdução ao HBase

O conector HBase Beam do Bigtable é gravado em Java e criado no cliente HBase do Bigtable para Java. Ele é compatível com o SDK 2.x do Dataflow para Java, baseado no Apache Beam. O código-fonte do conector está no repositório googleapis/java-bigtable-hbase do GitHub.

Nesta página, você terá uma visão geral de como usar as transformações Read e Write.

Configurar a autenticação

Para usar as amostras de Java nesta página de um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure o Application Default Credentials com as credenciais de usuário.

  1. Instale a CLI do Google Cloud.
  2. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  3. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login

Veja mais informações em: Configurar a autenticação para um ambiente de desenvolvimento local.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Configure o Application Default Credentials para o código em execução no Google Cloud.

Adicionar o conector a um projeto do Maven

Para adicionar o conector do HBase Beam para o Bigtable a um projeto do Maven, adicione o artefato do Maven ao arquivo pom.xml como uma dependência:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especificar a configuração do Bigtable

Crie uma interface de opções para permitir entradas para executar o pipeline:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Ao ler ou gravar no Cloud Bigtable, é preciso fornecer um objeto de configuração CloudBigtableConfiguration. Esse objeto especifica os códigos do projeto e da instância para a tabela, bem como o próprio nome da tabela:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para ler, forneça um objeto de configuração CloudBigtableScanConfiguration, que permite especificar um objeto Scan do Apache HBase que limita e filtra os resultados de uma leitura. Consulte Como ler do Bigtable para detalhes.

Ler do Bigtable

Para ler uma tabela do Cloud Bigtable, aplique uma transformação Read ao resultado de uma operação CloudBigtableIO.read. A transformação Read retorna um PCollection de objetos Result do HBase, em que cada elemento no PCollection representa uma única linha na tabela.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Por padrão, uma operação CloudBigtableIO.read retorna todas as linhas na tabela. É possível usar um objeto HBase Scan para limitar a leitura a um intervalo de chaves de linha na tabela ou para aplicar filtros aos resultados da leitura. Para usar um objeto Scan, inclua-o no CloudBigtableScanConfiguration.

Por exemplo, é possível adicionar um Scan que retorna apenas o primeiro par de chave-valor de cada linha na tabela, o que é útil ao contar o número de linhas na tabela:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Gravar no Bigtable

Para gravar em uma tabela do Bigtable, faça o apply de uma operação CloudBigtableIO.writeToTable. Será preciso realizar essa operação em um PCollection de objetos Mutation do HBase, que podem incluir objetos Put e Delete.

É necessário que exista uma tabela do Bigtable com grupos de colunas apropriados definidos. O conector do Dataflow não cria tabelas e grupos de colunas rapidamente. É possível usar a CLI cbt para criar uma tabela e configurar grupos de colunas ou fazer isso de modo programático.

Antes de realizar uma gravação no Bigtable, crie o canal do Cloud Dataflow. Assim, é possível serializar os objetos put e delete pela rede:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Em geral, você precisa executar uma transformação, como um ParDo, para formatar os dados de saída em uma coleção de objetos Put ou Delete do HBase. O exemplo a seguir mostra uma transformação DoFn, que usa o valor atual como chave de linha para um Put. Em seguida, será possível gravar os objetos Put no Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para ativar o controle de fluxo de gravação em lote, defina BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL como true. Esse recurso limita automaticamente a taxa de tráfego para solicitações de gravação em lote e permite que o escalonamento automático do Bigtable adicione ou remova nós automaticamente para lidar com o job do Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Veja o exemplo completo de gravação, incluindo a variação que permite o controle de fluxo de gravação em lote.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}