Bigtable HBase Beam-Connector

Damit Sie Bigtable in einer Dataflow-Pipeline verwenden können, sind zwei Open-Source-E/A-Connectors für Bigtable Beam verfügbar.

Wenn Sie von HBase zu Bigtable migrieren oder Ihre Anwendung die HBase API aufruft, verwenden Sie den auf dieser Seite beschriebenen Bigtable-HBase Beam-Connector (CloudBigtableIO).

In allen anderen Fällen sollten Sie den Bigtable Beam-Connector (BigtableIO) in Verbindung mit dem Cloud Bigtable-Client für Java verwenden, der mit den Cloud Bigtable APIs funktioniert. Eine Anleitung zur Verwendung dieses Connectors finden Sie unter Bigtable Beam-Connector.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Erste Schritte mit HBase

Der Bigtable-HBase Beam-Connector ist in Java geschrieben und baut auf dem Bigtable-HBase-Client für Java auf. Er ist mit dem Dataflow SDK 2.x für Java kompatibel, das auf Apache Beam basiert. Den Quellcode des Connectors finden Sie auf GitHub im Repository googleapis/java-bigtable-hbase.

Diese Seite bietet einen Überblick über die Verwendung von Read- und Write-Transformationen.

Authentifizierung einrichten

Wenn Sie die Java-Beispiele auf dieser Seite aus einer lokalen Entwicklungsumgebung heraus verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Installieren Sie die Google Cloud CLI.
  2. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  3. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login

Weitere Informationen: Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Standardanmeldedaten für Anwendungen für Code einrichten, der in Google Cloud ausgeführt wird.

Connector einem Maven-Projekt hinzufügen

Wenn Sie einem Maven-Projekt den Bigtable-HBase Beam-Connector hinzufügen möchten, fügen Sie das Maven-Artefakt als Abhängigkeit in die Datei pom.xml ein:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Bigtable-Konfiguration angeben

Erstellen Sie eine Schnittstelle für Optionen, um Eingaben für die Ausführung der Pipeline zuzulassen:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Wenn Sie aus Bigtable lesen oder in Bigtable schreiben, müssen Sie ein CloudBigtableConfiguration-Konfigurationsobjekt angeben. Dieses Objekt legt die Projekt-ID und die Instanz-ID für Ihre Tabelle sowie den Namen der Tabelle fest:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Stellen Sie zum Lesen ein CloudBigtableScanConfiguration-Konfigurationsobjekt bereit, mit dem Sie ein Scan-Objekt von Apache HBase angeben können, das die Ergebnisse eines Lesevorgangs einschränkt und filtert. Weitere Informationen finden Sie unter Daten aus Bigtable lesen.

Aus Bigtable lesen

Zum Lesen aus einer Tabelle in Bigtable wenden Sie eine Read-Transformation auf das Ergebnis eines CloudBigtableIO.read-Vorgangs an. Die Read-Transformation gibt eine PCollection von HBase-Result-Objekten zurück, wobei jedes Element in PCollection eine einzelne Zeile in der Tabelle darstellt.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Standardmäßig gibt ein CloudBigtableIO.read-Vorgang alle Zeilen in Ihrer Tabelle zurück. Sie können ein HBase-Scan-Objekt verwenden, um den Lesevorgang auf einen Bereich von Zeilenschlüsseln in Ihrer Tabelle zu beschränken oder Filter auf die Ergebnisse des Lesevorgangs anzuwenden. Wenn Sie ein Scan-Objekt verwenden möchten, fügen Sie es in CloudBigtableScanConfiguration ein.

Sie können beispielsweise ein Scan-Objekt einfügen, das nur das erste Schlüsselwertpaar aus jeder Zeile in der Tabelle zurückgibt. Dies ist zum Zählen der Zeilen in der Tabelle nützlich:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

In Bigtable schreiben

Für den Schreibzugriff auf eine Bigtable-Tabelle wenden Sie mit apply einen CloudBigtableIO.writeToTable-Vorgang an. Sie müssen diesen Vorgang auf eine PCollection von HBase-Mutation-Objekten anwenden, die sowohl Put- als auch Delete-Objekte enthalten kann.

Die Bigtable-Tabelle muss bereits vorhanden sein und die entsprechenden Spaltenfamilien müssen definiert sein. Der Dataflow-Connector erstellt Tabellen und Spaltenfamilien nicht spontan. Sie können mit der cbt-Befehlszeile eine Tabelle erstellen und Spaltenfamilien einrichten oder dies programmatisch tun.

Bevor Sie in Bigtable schreiben, müssen Sie Ihre Dataflow-Pipeline erstellen, damit Puts und Deletes über das Netzwerk serialisiert werden können:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Im Allgemeinen müssen Sie eine Transformation wie ParDo ausführen, um Ihre Ausgabedaten in eine Sammlung von HBase-Put- oder -Delete-Objekten zu formatieren. Das folgende Beispiel zeigt eine DoFn-Transformation, die den aktuellen Wert als Zeilenschlüssel für eine Put verwendet. Sie können die Put-Objekte dann in Bigtable schreiben.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Zum Aktivieren der Batch-Schreibflusssteuerung setzen Sie BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL auf true. Diese Funktion begrenzt den Traffic für Batchschreibanfragen automatisch und ermöglicht, dass beim Autoscaling von Bigtable automatisch Knoten hinzugefügt oder entfernt werden, um Ihren Dataflow-Job zu verarbeiten.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Hier ist das vollständige Schreibbeispiel, einschließlich der Variante, die die Batch-Schreibvorgangssteuerung ermöglicht.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}