Conetor do Beam do HBase do Bigtable

Para ajudar a usar o Bigtable num pipeline do Dataflow, estão disponíveis dois conetores Bigtable Beam I/O de código aberto.

Se estiver a migrar do HBase para o Bigtable ou a sua aplicação chamar a API HBase, use o conetor HBase Beam do Bigtable (CloudBigtableIO) abordado nesta página.

Em todos os outros casos, deve usar o conector Bigtable Beam (BigtableIO) juntamente com o cliente do Cloud Bigtable para Java, que funciona com as APIs Cloud Bigtable. Para começar a usar esse conetor, consulte o conetor Bigtable Beam.

Para mais informações sobre o modelo de programação do Apache Beam, consulte a documentação do Beam.

Comece a usar o HBase

O conector Bigtable HBase Beam é escrito em Java e é criado com base no cliente HBase do Bigtable para Java. É compatível com o SDK do Dataflow 2.x para Java, que se baseia no Apache Beam. O código fonte do conetor está no GitHub no repositório googleapis/java-bigtable-hbase.

Esta página oferece uma vista geral de como usar as transformações Read e Write.

Configure a autenticação

Para usar os Java exemplos nesta página num ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure as Credenciais predefinidas da aplicação com as suas credenciais de utilizador.

    Instale a CLI Google Cloud.

    Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Para mais informações, consulte Set up authentication for a local development environment.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.

Adicione o conetor a um projeto Maven

Para adicionar o conector Bigtable HBase Beam a um projeto Maven, adicione o artefacto Maven ao ficheiro pom.xml como uma dependência:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especifique a configuração do Bigtable

Crie uma interface de opções para permitir entradas para executar o seu pipeline:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Quando lê ou escreve no Bigtable, tem de fornecer um objeto de configuração CloudBigtableConfiguration. Este objeto especifica o ID do projeto e o ID da instância da sua tabela, bem como o nome da própria tabela:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para a leitura, forneça um objeto de configuração CloudBigtableScanConfiguration, que lhe permite especificar um objeto Scan do Apache HBase que limita e filtra os resultados de uma leitura. Consulte o artigo Ler a partir do Bigtable para ver detalhes.

Ler a partir do Bigtable

Para ler a partir de uma tabela do Bigtable, aplica uma transformação Read ao resultado de uma operação CloudBigtableIO.read. A transformação Read devolve uma PCollection de objetos Result do HBase, em que cada elemento em PCollection representa uma única linha na tabela.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Por predefinição, uma operação CloudBigtableIO.read devolve todas as linhas na sua tabela. Pode usar um objeto Scan do HBase para limitar a leitura a um intervalo de chaves de linhas na sua tabela ou para aplicar filtros aos resultados da leitura. Para usar um objeto Scan, inclua-o no elemento CloudBigtableScanConfiguration.

Por exemplo, pode adicionar um Scan que devolva apenas o primeiro par de chave-valor de cada linha na tabela, o que é útil quando conta o número de linhas na tabela:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Escreva no Bigtable

Para escrever numa tabela do Bigtable, apply uma operação CloudBigtableIO.writeToTable. Tem de realizar esta operação em PCollection objetos Mutation do HBase, que podem incluir objetos Put e Delete.

A tabela do Bigtable já tem de existir e ter as famílias de colunas adequadas definidas. O conetor do Dataflow não cria tabelas nem famílias de colunas em tempo real. Pode usar a cbt CLI para criar uma tabela e configurar famílias de colunas, ou pode fazê-lo programaticamente.

Antes de escrever no Bigtable, tem de criar o pipeline do Dataflow para que as inserções e as eliminações possam ser serializadas na rede:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Em geral, tem de fazer uma transformação, como uma ParDo, para formatar os dados de saída numa coleção de objetos Put ou Delete do HBase. O exemplo seguinte mostra uma transformação DoFn que usa o valor atual como a chave da linha para um Put. Em seguida, pode escrever os objetos Put no Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para ativar o controlo de fluxo de gravação em lote, defina BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL como true. Esta funcionalidade limita automaticamente a taxa de tráfego para pedidos de gravação em lote e permite que o dimensionamento automático do Bigtable adicione ou remova nós automaticamente para processar a sua tarefa do Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Segue-se o exemplo de escrita completo, incluindo a variação que permite o controlo do fluxo de escrita em lote.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}