Connecteur Bigtable HBase Beam

Pour vous aider à utiliser Bigtable dans un pipeline Dataflow, deux connecteurs d'E/S Bigtable Open Source sont disponibles.

Si vous effectuez une migration depuis HBase vers Bigtable ou si votre application appelle l'API HBase, utilisez le connecteur Bigtable HBase Beam (CloudBigtableIO) décrit sur cette page.

Dans tous les autres cas, vous devez utiliser le connecteur Bigtable Beam (BigtableIO) conjointement avec le client Cloud Bigtable pour Java, qui fonctionne avec les API Cloud Bigtable. Pour commencer à utiliser ce connecteur, consultez la page Connecteur Bigtable Beam.

Pour en savoir plus sur le modèle de programmation Apache Beam, consultez la documentation de Beam.

Premiers pas avec HBase

Le connecteur Bigtable HBase Beam est écrit en Java et est basé sur le client Bigtable HBase pour Java. Il est compatible avec le SDK Dataflow 2.x pour Java, basé sur Apache Beam. Le code source du connecteur se trouve sur GitHub, dans le dépôt googleapis/java-bigtable-hbase.

Cette page explique comment utiliser les transformations Read et Write.

Configurer l'authentification

Pour utiliser les exemples Java de cette page dans un environnement de développement local, installez et initialisez gcloud CLI, puis configurez le service Identifiants par défaut de l'application à l'aide de vos identifiants utilisateur.

  1. Installez Google Cloud CLI.
  2. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  3. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login

Pour en savoir plus, consultez les sections sur Configurer l'authentification pour un environnement de développement local.

Pour en savoir plus sur la configuration de l'authentification dans un environnement de production, consultez Configurer le service Identifiants par défaut de l'application pour le code exécuté sur Google Cloud.

Ajouter le connecteur à un projet Maven

Pour ajouter le connecteur Bigtable HBase Beam à un projet Maven, ajoutez l'artefact Maven à votre fichier pom.xml en tant que dépendance:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Spécifier la configuration Bigtable

Créez une interface d'options pour permettre des entrées pour le fonctionnement de votre pipeline :

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Lorsque vous lisez ou écrivez dans Bigtable, vous devez fournir un objet de configuration CloudBigtableConfiguration. Cet objet indique l'ID de projet et l'ID d'instance de votre table, ainsi que le nom de la table elle-même :

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Pour la lecture, fournissez un objet de configuration CloudBigtableScanConfiguration qui vous permet de spécifier un objet Apache HBase Scan qui limite et filtre les résultats d'une lecture. Pour en savoir plus, consultez la section Lire depuis Bigtable.

Lire depuis Bigtable

Pour lire depuis une table Bigtable, vous devez appliquer une transformation Read au résultat d'une opération CloudBigtableIO.read. La transformation Read renvoie une collection PCollectiond'objets HBase Result, où chaque élément de la PCollection représente une ligne individuelle de la table.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Par défaut, une opération CloudBigtableIO.read renvoie toutes les lignes de votre table. Vous pouvez utiliser un objet HBase Scan pour limiter la lecture à une plage de clés de ligne de la table, ou pour appliquer des filtres aux résultats de la lecture. Pour utiliser un objet Scan, incluez-le dans votre objet CloudBigtableScanConfiguration.

Par exemple, vous pouvez ajouter un élément Scan qui renvoie seulement la première paire clé-valeur de chaque ligne de la table, ce qui est utile lorsque vous comptez le nombre de lignes de la table :

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Écrire dans Bigtable

Pour écrire dans une table Bigtable, vous devez appliquer (apply) une opération CloudBigtableIO.writeToTable. Cette opération doit être effectuée sur une collection PCollection d'objets HBase Mutation, qui peut inclure des objets Put et Delete.

La table Bigtable doit déjà exister et les familles de colonnes appropriées doivent être définies. Le connecteur Dataflow ne crée pas de tables ni de familles de colonnes à la volée. Vous pouvez utiliser la CLI cbt pour créer une table et configurer des familles de colonnes, ou vous pouvez le faire par programmation.

Avant d'écrire dans Bigtable, vous devez créer votre pipeline Dataflow afin que les opérations "put" et "delete" puissent être sérialisées sur le réseau:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

En général, vous devez effectuer une transformation, telle que ParDo, pour mettre en forme vos données de sortie en une collection d'objets HBase Put ou Delete. L'exemple suivant montre une transformation DoFn qui prend la valeur actuelle et l'utilise comme clé de ligne pour un Put. Vous pouvez ensuite écrire les objets Put dans Bigtable :

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Pour activer le contrôle du flux d'écriture par lot, définissez BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL sur true. Cette fonctionnalité limite automatiquement le débit du trafic pour les requêtes d'écriture par lot et permet à Bigtable d'ajouter ou de supprimer automatiquement des nœuds pour gérer votre tâche Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Voici l'exemple d'écriture complet, y compris la variante qui active le contrôle de flux d'écriture par lot.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}