Conector HBase Beam do Bigtable

Para ajudar você a usar o Bigtable em um pipeline do Dataflow, estão disponíveis dois conectores de E/S de código aberto do Bigtable Beam.

Se você estiver migrando do HBase para o Bigtable ou se o aplicativo chamar a API HBase, use o conector do HBase Beam do Bigtable (CloudBigtableIO) discutido nesta página.

Em todos os outros casos, use o conector de Beam do Bigtable (BigtableIO) em conjunto com o cliente do Cloud Bigtable para Java, que funciona com as APIs do Cloud Bigtable. Para começar a usar esse conector, consulte Conector Beam do Bigtable.

Para mais informações sobre o modelo de programação do Apache Beam, consulte a documentação do Beam.

Introdução ao HBase

O conector HBase Beam do Bigtable é gravado em Java e criado no cliente HBase do Bigtable para Java. Ele é compatível com o SDK 2.x do Dataflow para Java, baseado no Apache Beam. O código-fonte do conector está no repositório googleapis/java-bigtable-hbase do GitHub.

Nesta página, você terá uma visão geral de como usar as transformações Read e Write.

Configurar a autenticação

Para usar os exemplos Java desta página em um ambiente de desenvolvimento local, instale e inicialize o gcloud CLI e e configure o Application Default Credentials com suas credenciais de usuário.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Confira mais informações em Set up authentication for a local development environment.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.

Adicionar o conector a um projeto do Maven

Para adicionar o conector do HBase Beam para o Bigtable a um projeto do Maven, adicione o artefato do Maven ao arquivo pom.xml como uma dependência:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especificar a configuração do Bigtable

Crie uma interface de opções para permitir entradas para executar o pipeline:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Ao ler ou gravar no Cloud Bigtable, é preciso fornecer um objeto de configuração CloudBigtableConfiguration. Esse objeto especifica os códigos do projeto e da instância para a tabela, bem como o próprio nome da tabela:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para ler, forneça um objeto de configuração CloudBigtableScanConfiguration, que permite especificar um objeto Scan do Apache HBase que limita e filtra os resultados de uma leitura. Consulte Como ler do Bigtable para detalhes.

Ler do Bigtable

Para ler uma tabela do Cloud Bigtable, aplique uma transformação Read ao resultado de uma operação CloudBigtableIO.read. A transformação Read retorna um PCollection de objetos Result do HBase, em que cada elemento no PCollection representa uma única linha na tabela.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Por padrão, uma operação CloudBigtableIO.read retorna todas as linhas na tabela. É possível usar um objeto HBase Scan para limitar a leitura a um intervalo de chaves de linha na tabela ou para aplicar filtros aos resultados da leitura. Para usar um objeto Scan, inclua-o no CloudBigtableScanConfiguration.

Por exemplo, é possível adicionar um Scan que retorna apenas o primeiro par de chave-valor de cada linha na tabela, o que é útil ao contar o número de linhas na tabela:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Gravar no Bigtable

Para gravar em uma tabela do Bigtable, faça o apply de uma operação CloudBigtableIO.writeToTable. Será preciso realizar essa operação em um PCollection de objetos Mutation do HBase, que podem incluir objetos Put e Delete.

É necessário que exista uma tabela do Bigtable com grupos de colunas apropriados definidos. O conector do Dataflow não cria tabelas e grupos de colunas rapidamente. É possível usar a CLI cbt para criar uma tabela e configurar grupos de colunas ou fazer isso de modo programático.

Antes de realizar uma gravação no Bigtable, crie o canal do Cloud Dataflow. Assim, é possível serializar os objetos put e delete pela rede:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Em geral, você precisa executar uma transformação, como um ParDo, para formatar os dados de saída em uma coleção de objetos Put ou Delete do HBase. O exemplo a seguir mostra uma transformação DoFn, que usa o valor atual como chave de linha para um Put. Em seguida, será possível gravar os objetos Put no Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para ativar o controle de fluxo de gravação em lote, defina BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL como true. Esse recurso limita automaticamente a taxa de tráfego para solicitações de gravação em lote e permite que o escalonamento automático do Bigtable adicione ou remova nós automaticamente para lidar com o job do Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Veja o exemplo completo de gravação, incluindo a variação que permite o controle de fluxo de gravação em lote.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}