Conector de Bigtable HBase Beam

Para ayudarte a usar Bigtable en una canalización de Dataflow, hay dos conectores de E/S de Bigtable Beam de código abierto disponibles.

Si migras de HBase a Bigtable o si tu aplicación llama a la API de HBase, usa el conector de HBase de Beam de Bigtable (CloudBigtableIO) que se describe en esta página.

En todos los demás casos, debes usar el conector de Bigtable Beam (BigtableIO) junto con el cliente de Cloud Bigtable para Java, que funciona con las APIs de Cloud Bigtable. Para comenzar a usar ese conector, consulta Conector de Bigtable Beam.

Para obtener más información sobre el modelo de programación de Apache Beam, consulta la documentación de Beam.

Comienza a usar HBase

El conector de Bigtable HBase Beam está escrito en Java y se compila en el cliente de Bigtable HBase para Java. Es compatible con el SDK de Dataflow 2.x para Java, que está basado en Apache Beam. El código fuente del conector se encuentra en el repositorio googleapis/java-bigtable-hbase de GitHub.

En esta página, se proporciona una descripción general de cómo usar las transformaciones Read y Write.

Configura la autenticación

Para usar las muestras de Java de esta página desde un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Instala Google Cloud CLI.
  2. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  3. Crea credenciales de autenticación locales para tu Cuenta de Google:

    gcloud auth application-default login

Para obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

Si deseas obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Configura las credenciales predeterminadas de la aplicación para el código que se ejecuta en Google Cloud..

Agrega el conector a un proyecto de Maven

Para agregar el conector de HBase de Beam de Bigtable a un proyecto de Maven, agrega el artefacto de Maven a tu archivo pom.xml como una dependencia:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especifica la configuración de Bigtable

Crea una interfaz de opciones a fin de permitir entradas para ejecutar la canalización:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cuando lees o escribes en Bigtable, debes proporcionar un objeto de configuración CloudBigtableConfiguration. Este objeto especifica el ID del proyecto y de la instancia que se usará en tu tabla, además del nombre de esta última:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para la lectura, proporciona un objeto de configuración CloudBigtableScanConfiguration, que te permita especificar un objeto Scan de Apache HBase que limite y filtre los resultados de una lectura. Si deseas obtener más detalles, consulta Lee desde Bigtable.

Lee desde Bigtable

Para leer desde una tabla de Bigtable, aplica una transformación Read al resultado de una operación CloudBigtableIO.read. La transformación Read muestra un objeto PCollection de HBase en Result, donde cada elemento de PCollection representa una sola fila en la tabla.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

De forma predeterminada, una operación CloudBigtableIO.read muestra todas las filas de la tabla. Puedes usar un objeto Scan de HBase para limitar la lectura a un rango de claves de fila dentro de tu tabla o aplicar filtros a los resultados de la lectura. Para usar un objeto Scan, inclúyelo en tu CloudBigtableScanConfiguration.

Por ejemplo, puedes agregar un Scan que muestre solo el primer par clave-valor de cada fila de tu tabla, lo que resulta útil cuando se cuenta el número de filas en la tabla:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Escribe en Bigtable

A fin de escribir en una tabla de Bigtable, debes usar apply para una operación CloudBigtableIO.writeToTable. Deberás realizar esta operación en un PCollection de HBase de objetos Mutation, que pueden incluir objetos Put y Delete.

La tabla de Bigtable ya debe existir y debe tener definida la familia de columnas correspondiente. El conector de Dataflow no crea tablas ni familias de columnas sobre la marcha. Puedes usar la CLI de cbt para crear una tabla y configurar familias de columnas. También puedes hacerlo de manera programática.

Antes de escribir en Bigtable, debes crear tu canalización de Dataflow para que los objetos Put y Delete puedan serializarse en la red:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

En general, deberás realizar una transformación, como ParDo, para formatear los datos de salida en una colección de objetos Put o Delete de HBase. En el siguiente ejemplo, se muestra una transformación DoFn que toma el valor actual y lo usa como la clave de fila para una Put. Luego, puedes escribir los objetos Put en Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para habilitar el control de flujo de escritura por lotes, establece BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL en true. Esta función limita de forma automática la frecuencia de tráfico de las solicitudes de escritura por lotes y permite que el ajuste de escala automático de Bigtable agregue o quite nodos automáticamente para controlar el trabajo de Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Este es el ejemplo de escritura completo, incluida la variación que habilita el control del flujo de escritura por lotes.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}