Conector de Beam de HBase de Bigtable

Para ayudarte a usar Bigtable en una canalización de Dataflow, están disponibles dos conectores de E/S de Bigtable Beam de código abierto.

Si migras de HBase a Bigtable o tu aplicación llama a la API de HBase, usa el conector de Beam de HBase de Bigtable (CloudBigtableIO) que se describe en esta página.

En todos los demás casos, debes usar el conector de Bigtable Beam (BigtableIO) junto con el cliente de Cloud Bigtable para Java, que funciona con las APIs de Cloud Bigtable. Para comenzar a usar ese conector, consulta Conector de Bigtable Beam.

Para obtener más información sobre el modelo de programación de Apache Beam, consulta la documentación de Beam.

Comienza a usar HBase

El conector de Beam de HBase de Bigtable está escrito en Java y se compila en el cliente de HBase de Bigtable para Java. Es compatible con el SDK de Dataflow 2.x para Java, que se basa en Apache Beam. El código fuente del conector está en GitHub en el repositorio googleapis/java-bigtable-hbase.

En esta página, se proporciona una descripción general de cómo usar las transformaciones Read y Write.

Configura la autenticación

Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Para obtener más información, consulta Set up authentication for a local development environment.

Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Agrega el conector a un proyecto de Maven

Para agregar el conector de Bigtable HBase Beam a un proyecto de Maven, agrega el artefacto de Maven a tu archivo pom.xml como una dependencia:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especifica la configuración de Bigtable

Crea una interfaz de opciones a fin de permitir entradas para ejecutar la canalización:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cuando lees o escribes en Bigtable, debes proporcionar un objeto de configuración CloudBigtableConfiguration. Este objeto especifica el ID del proyecto y de la instancia que se usará en tu tabla, además del nombre de esta última:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para la lectura, proporciona un objeto de configuración CloudBigtableScanConfiguration, que te permita especificar un objeto Scan de Apache HBase que limite y filtre los resultados de una lectura. Si deseas obtener más detalles, consulta Lee desde Bigtable.

Lee desde Bigtable

Para leer desde una tabla de Bigtable, aplica una transformación Read al resultado de una operación CloudBigtableIO.read. La transformación Read muestra un objeto PCollection de HBase en Result, donde cada elemento de PCollection representa una sola fila en la tabla.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

De forma predeterminada, una operación CloudBigtableIO.read muestra todas las filas de la tabla. Puedes usar un objeto Scan de HBase para limitar la lectura a un rango de claves de fila dentro de tu tabla o aplicar filtros a los resultados de la lectura. Para usar un objeto Scan, inclúyelo en tu CloudBigtableScanConfiguration.

Por ejemplo, puedes agregar un Scan que muestre solo el primer par clave-valor de cada fila de tu tabla, lo que resulta útil cuando se cuenta el número de filas en la tabla:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Escribe en Bigtable

A fin de escribir en una tabla de Bigtable, debes usar apply para una operación CloudBigtableIO.writeToTable. Deberás realizar esta operación en un PCollection de HBase de objetos Mutation, que pueden incluir objetos Put y Delete.

La tabla de Bigtable ya debe existir y debe tener definida la familia de columnas correspondiente. El conector de Dataflow no crea tablas y familias de columnas sobre la marcha. Puedes usar la CLI de cbt para crear una tabla y configurar familias de columnas, o puedes hacerlo de manera programática.

Antes de escribir en Bigtable, debes crear tu canalización de Dataflow para que los objetos Put y Delete puedan serializarse en la red:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

En general, deberás realizar una transformación, como ParDo, para formatear los datos de salida en una colección de objetos Put o Delete de HBase. En el siguiente ejemplo, se muestra una transformación DoFn que toma el valor actual y lo usa como la clave de fila para un Put. Luego, puedes escribir los objetos Put en Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para habilitar el control de flujo de escritura por lotes, establece BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL en true. Esta función limita automáticamente la tasa de tráfico para las solicitudes de escritura por lotes y permite que el ajuste de escala automático de Bigtable agregue o quite nodos automáticamente para controlar tu trabajo de Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Este es el ejemplo completo de escritura, incluida la variación que habilita el control de flujo de escritura por lotes.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}