Connettore Bigtable HBase Beam

Per aiutarti a utilizzare Bigtable in una pipeline Dataflow, sono disponibili due connettori Bigtable Beam I/O open source.

Se esegui la migrazione da HBase a Bigtable o la tua applicazione chiama l'API HBase, utilizza il connettore Bigtable HBase Beam (CloudBigtableIO) descritto in questa pagina.

In tutti gli altri casi, devi utilizzare il connettore Bigtable Beam (BigtableIO) insieme al client Cloud Bigtable per Java, che funziona con le API Cloud Bigtable. Per iniziare a utilizzare questo connettore, vedi Connettore Bigtable Beam.

Per ulteriori informazioni sul modello di programmazione Apache Beam, consulta la documentazione di Beam.

Inizia a utilizzare HBase

Il connettore Bigtable HBase Beam è scritto in Java e si basa sul client Bigtable HBase per Java. È compatibile con l'SDK Dataflow 2.x per Java, basato su Apache Beam. Il codice sorgente del connettore si trova su GitHub nel repository googleapis/java-bigtable-hbase.

Questa pagina fornisce una panoramica su come utilizzare le trasformazioni Read e Write.

Configura l'autenticazione

Per utilizzare gli esempi di Java questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.

    Installa Google Cloud CLI.

    Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Per ulteriori informazioni, vedi Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, vedi Set up Application Default Credentials for code running on Google Cloud.

Aggiungere il connettore a un progetto Maven

Per aggiungere il connettore Bigtable HBase Beam a un progetto Maven, aggiungi l'artefatto Maven al file pom.xml come dipendenza:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Specifica la configurazione di Bigtable

Crea un'interfaccia delle opzioni per consentire gli input per l'esecuzione della pipeline:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Quando leggi o scrivi in Bigtable, devi fornire un oggetto di configurazione CloudBigtableConfiguration. Questo oggetto specifica l'ID progetto e l'ID istanza per la tabella, nonché il nome della tabella stessa:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Per la lettura, fornisci un oggetto di configurazione CloudBigtableScanConfiguration, che ti consente di specificare un oggetto Scan Apache HBase che limita e filtra i risultati di una lettura. Per ulteriori dettagli, consulta la sezione Lettura da Bigtable.

Lettura da Bigtable

Per leggere da una tabella Bigtable, applichi una trasformazione Read al risultato di un'operazione CloudBigtableIO.read. La trasformazione Read restituisce un PCollection di oggetti HBase Result, dove ogni elemento in PCollection rappresenta una singola riga della tabella.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Per impostazione predefinita, un'operazione CloudBigtableIO.read restituisce tutte le righe della tabella. Puoi utilizzare un oggetto HBase Scan per limitare la lettura a un intervallo di chiavi riga all'interno della tabella o per applicare filtri ai risultati della lettura. Per utilizzare un oggetto Scan, includilo in CloudBigtableScanConfiguration.

Ad esempio, puoi aggiungere un Scan che restituisce solo la prima coppia chiave-valore di ogni riga della tabella, il che è utile per conteggiare il numero di righe della tabella:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Scrivere in Bigtable

Per scrivere in una tabella Bigtable, devi apply un'operazione CloudBigtableIO.writeToTable. Dovrai eseguire questa operazione su un PCollection di oggetti HBase Mutation, che possono includere oggetti Put e Delete.

La tabella Bigtable deve già esistere e deve avere le famiglie di colonne appropriate definite. Il connettore Dataflow non crea tabelle e famiglie di colonne al volo. Puoi utilizzare l'interfaccia a riga di comando cbt per creare una tabella e configurare le famiglie di colonne oppure puoi farlo in modo programmatico.

Prima di scrivere su Bigtable, devi creare la pipeline Dataflow in modo che le operazioni di inserimento ed eliminazione possano essere serializzate sulla rete:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

In generale, devi eseguire una trasformazione, ad esempio un ParDo, per formattare i dati di output in una raccolta di oggetti HBase Put o Delete. L'esempio seguente mostra una trasformazione DoFn che prende il valore corrente e lo utilizza come chiave di riga per un Put. Poi puoi scrivere gli oggetti Put in Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Per abilitare il controllo del flusso di scrittura batch, imposta BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL su true. Questa funzionalità limita automaticamente la frequenza del traffico per le richieste di scrittura batch e consente alla scalabilità automatica di Bigtable di aggiungere o rimuovere automaticamente i nodi per gestire il job Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Di seguito è riportato l'esempio di scrittura completo, inclusa la variante che consente il controllo del flusso di scrittura batch.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}