Conector de Dataflow para Cloud Bigtable

El conector de Cloud Dataflow para Cloud Bigtable permite usar Cloud Bigtable en una canalización de Cloud Dataflow. Puedes usar el conector para operaciones de transmisión y por lotes.

El conector está escrito en Java y está integrado en el cliente HBase de Cloud Bigtable para Java. Es compatible con el SDK de Dataflow 2.x para Java, que está basado en Apache Beam. El código fuente de este conector se encuentra en el repositorio GoogleCloudPlatform/cloud-bigtable-client de GitHub.

En esta página, se proporciona una descripción general de cómo usar transformaciones Read y Write con el conector de Cloud Dataflow. También puedes leer la documentación completa de la API del conector de Cloud Dataflow.

Agrega el conector a un proyecto de Maven

Para agregar el conector de Cloud Dataflow a un proyecto de Maven, agrega el artefacto de Maven a tu archivo pom.xml como una dependencia:

<dependency>
    <groupId>com.google.cloud.bigtable</groupId>
    <artifactId>bigtable-hbase-beam</artifactId>
    <version>1.11.0</version>
</dependency>

Cómo especificar la configuración de Cloud Bigtable

Cuando lees datos de Cloud Bigtable, o los escribes, debes proporcionar un objeto de configuración CloudBigtableScanConfiguration. Este objeto especifica el ID del proyecto y de la instancia que se usará en tu tabla, además del nombre de esta última:

CloudBigtableTableConfiguration config =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .build();

De manera opcional, puedes proporcionar un objeto Scan de Apache HBase que limite y filtre los resultados de una lectura. Si deseas obtener más detalles, consulta Cómo leer en Cloud Bigtable.

Cómo leer en Cloud Bigtable

Para realizar una lectura en una tabla de Cloud Bigtable, aplica una transformación Read al resultado de una operación CloudBigtableIO.read. La transformación Read muestra una PCollection de objetos Result de HBase; cada elemento de la PCollection representa una única fila de la tabla.

De forma predeterminada, una operación CloudBigtableIO.read muestra todas las filas de tu tabla. Puedes usar un objeto Scan de HBase para limitar la lectura a un rango de claves de fila de tu tabla, o a fin de aplicar filtros a los resultados de la lectura. Para usar un objeto Scan, inclúyelo en tu CloudBigtableScanConfiguration.

Por ejemplo, puedes agregar un Scan que muestre solo el primer par clave-valor de cada fila en tu tabla, lo cual es útil cuando se cuenta la cantidad de filas en la tabla:

Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());

// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(Count.<Result>globally())
    .apply(ParDo.of(stringifier))
    .apply(TextIO.write().to(options.getResultLocation()));

Cómo escribir en Cloud Bigtable

Para escribir en una tabla de Cloud Bigtable, necesitas apply una operación CloudBigtableIO.writeToTable. Debes realizar esta operación en una PCollection de objetos Mutation de HBase, que puede incluir objetos Put y Delete.

La tabla de Cloud Bigtable ya debe existir y debe tener definida la familia de columnas correspondiente. El conector de Dataflow no crea tablas y familias de columnas en el momento. Puedes usar la herramienta de línea de comandos de cbt para crear una tabla y configurar las familias de columnas, o puedes hacerlo de manera programática.

Antes de escribir en Cloud Bigtable, debes crear tu canalización de Cloud Dataflow a fin de que los objetos Put y Delete puedan serializarse en la red:

CloudBigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
Pipeline p = Pipeline.create(options);

En general, deberás ejecutar una transformación, como ParDo, para dar a tus datos salientes el formato de una recopilación de objetos Put o Delete de HBase. En el siguiente ejemplo, se muestra una transformación DoFn simple que toma el valor actual y lo usa como la clave de fila para Put:

static final DoFn<String, Mutation> MUTATION_TRANSFORM = new DoFn<String, Mutation>() {
  private static final long serialVersionUID = 1L;

  @ProcessElement
  public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
    c.output(new Put(c.element().getBytes()).addColumn(FAMILY, QUALIFIER, VALUE));
  }
};

Luego, puedes escribir los objetos Put en Cloud Bigtable:

p
    .apply(Create.of("Hello", "World"))
    .apply(ParDo.of(MUTATION_TRANSFORM))
    .apply(CloudBigtableIO.writeToTable(config));
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Documentación de Cloud Bigtable