Conector do Dataflow para o Cloud Bigtable

Com o conector do Cloud Dataflow para o Cloud Bigtable, é possível usar o Cloud Bigtable em um pipeline do Cloud Dataflow. Use o conector nas operações em lote e de streaming.

Além de ter a linguagem Java, ele foi criado no cliente HBase no Cloud Bigtable para Java. Ele é compatível com o Dataflow SDK 2.x para Java, baseado no Apache Beam. O código-fonte do conector está no repositório GoogleCloudPlatform/cloud-bigtable-client do GitHub.

Nesta página, apresentamos uma visão geral de como usar as transformações Read e Write com o conector do Cloud Dataflow. Leia também a documentação completa sobre a API do conector do Cloud Dataflow.

Como adicionar o conector a um projeto do Maven

Para incluir o conector do Cloud Dataflow em um projeto do Maven, adicione o artefato do Maven ao arquivo pom.xml como uma dependência.

<dependency>
    <groupId>com.google.cloud.bigtable</groupId>
    <artifactId>bigtable-hbase-beam</artifactId>
    <version>1.11.0</version>
</dependency>

Como especificar a configuração do Cloud Bigtable

Ao realizar operações de leitura ou gravação no Cloud Bigtable, é necessário fornecer um objeto de configuração CloudBigtableScanConfiguration. Esse objeto especifica os códigos do projeto e da instância para a tabela, bem como o próprio nome da tabela:

CloudBigtableTableConfiguration config =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .build();

Também é possível fornecer um objeto Scan do Apache HBase que limita e filtra os resultados de uma leitura. Consulte Como ler do Cloud Bigtable para saber mais detalhes.

Como ler do Cloud Bigtable

Para ler a partir de uma tabela do Cloud Bigtable, aplique uma transformação Read ao resultado de uma operação CloudBigtableIO.read. A transformação Read retorna um PCollection de objetos Result do HBase, em que cada elemento em PCollection representa uma única linha na tabela.

Por padrão, uma operação CloudBigtableIO.read retorna todas as linhas da tabela. Use um objeto Scan do HBase para limitar a leitura a um intervalo de chaves de linha dentro da tabela ou aplicar filtros aos resultados da leitura. Para usar um objeto Scan, inclua-o no CloudBigtableScanConfiguration.

Por exemplo, para adicionar um Scan que retorne somente o primeiro par de valores-chave de cada linha da tabela (útil para contar o número de linhas na tabela):

Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());

// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(Count.<Result>globally())
    .apply(ParDo.of(stringifier))
    .apply(TextIO.write().to(options.getResultLocation()));

Como gravar no Cloud Bigtable

Para fazer uma gravação em uma tabela do Cloud Bigtable, use apply em uma operação CloudBigtableIO.writeToTable. Essa operação precisa ser realizada em um PCollection de objetos Mutation do HBase, que podem incluir objetos Put e Delete.

É necessário que exista uma tabela do Cloud Bigtable com grupos de colunas apropriados definidos. O conector do Dataflow não cria tabelas e grupos de colunas rapidamente. Use a ferramenta de linha de comando cbt para criar uma tabela e definir grupos de colunas. Como alternativa, faça isso de maneira programática.

Antes de realizar uma gravação no Cloud Bigtable, crie o canal do Cloud Dataflow. Assim, é possível serializar os objetos put e delete pela rede:

CloudBigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
Pipeline p = Pipeline.create(options);

Em geral, você precisará realizar uma transformação, como ParDo, para formatar os dados de saída em uma coleção de objetos Put ou Delete do HBase. O exemplo a seguir mostra uma transformação DoFn simples que usa o valor atual como chave de linha para um objeto Put:

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @ProcessElement
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

Em seguida, grave os objetos Put no Cloud Bigtable:

p
    .apply(Create.of("Hello", "World"))
    .apply(ParDo.of(MUTATION_TRANSFORM))
    .apply(CloudBigtableIO.writeToTable(config));
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Documentação do Cloud Bigtable