Conector de Dataflow para Cloud Bigtable

El conector de Cloud Dataflow para Cloud Bigtable permite usar Cloud Bigtable en una canalización de Cloud Dataflow. Puedes usar el conector para operaciones de transmisión y por lotes.

El conector está escrito en Java y está integrado en el cliente HBase de Cloud Bigtable para Java. Es compatible con el SDK de Dataflow 2.x para Java, que está basado en Apache Beam. El código fuente del conector está en GitHub en el repositorio googleapis/java-bigtable-hbase.

En esta página, se proporciona una descripción general de cómo usar las transformaciones Read y Write con el conector de Cloud Dataflow. También puedes leer la documentación completa de la API del conector de Cloud Dataflow.

Agrega el conector a un proyecto de Maven

Para agregar el conector de Cloud Dataflow a un proyecto de Maven, agrega el artefacto de Maven a tu archivo pom.xml como una dependencia:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>1.16.0</version>
</dependency>

Especifica la configuración de Cloud Bigtable

Crea una interfaz de opciones a fin de permitir entradas para ejecutar la canalización:

public interface BigtableOptions extends DataflowPipelineOptions {
  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("bigtable-table")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cuando lees o escribes en Cloud Bigtable, debes proporcionar un objeto de configuración CloudBigtableConfiguration. Este objeto especifica el ID del proyecto y de la instancia que se usará en tu tabla, además del nombre de esta última:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para la lectura, proporciona un objeto de configuración CloudBigtableScanConfiguration, que te permita especificar un objeto Scan de Apache HBase que limite y filtre los resultados de una lectura. Si deseas obtener más detalles, consulta Lee en Cloud Bigtable.

Lee en Cloud Bigtable

Para leer desde una tabla de Cloud Bigtable, aplica una transformación Read al resultado de una operación CloudBigtableIO.read. La transformación Read muestra un PCollection de objetos Result de HBase, donde cada elemento de PCollection representa una sola fila en la tabla.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

De forma predeterminada, una operación CloudBigtableIO.read muestra todas las filas de la tabla. Puedes usar un objeto Scan de HBase para limitar la lectura a un rango de claves de fila dentro de tu tabla o aplicar filtros a los resultados de la lectura. Para usar un objeto Scan, inclúyelo en tu CloudBigtableScanConfiguration.

Por ejemplo, puedes agregar un Scan que muestre solo el primer par clave-valor de cada fila de tu tabla, lo que resulta útil cuando se cuenta el número de filas en la tabla:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Escribe en Cloud Bigtable

A fin de escribir en una tabla de Cloud Bigtable, debes usar apply para una operación CloudBigtableIO.writeToTable. Deberás realizar esta operación en un PCollection de HBase de objetos Mutation, que pueden incluir objetos Put y Delete.

La tabla de Cloud Bigtable ya debe existir y debe tener definida la familia de columnas correspondiente. El conector de Dataflow no crea tablas y familias de columnas en el momento. Puedes usar la herramienta de línea de comandos de cbt para crear una tabla y configurar familias de columnas, o puedes hacerlo de manera programática.

Antes de escribir en Cloud Bigtable, debes crear tu canalización de Cloud Dataflow a fin de que los objetos Put y Delete puedan serializarse en la red:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

En general, deberás realizar una transformación, como ParDo, para formatear los datos de salida en una colección de objetos Put o Delete de HBase. En el siguiente ejemplo, se muestra una transformación DoFn simple que toma el valor actual y lo usa como la clave de fila para un Put. Luego, puedes escribir los objetos Put en Cloud Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

A continuación, se muestra un ejemplo completo de escritura.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}