Bigtable HBase Beam-Connector

Zur Unterstützung der Verwendung von Bigtable in einer Dataflow-Pipeline stehen zwei Open-Source-E/A-Connectors für Bigtable Beam zur Verfügung.

Wenn Sie von HBase zu Bigtable migrieren oder Ihre Anwendung die HBase API aufruft, verwenden Sie den auf dieser Seite beschriebenen Bigtable HBase Beam-Connector (CloudBigtableIO).

In allen anderen Fällen sollten Sie den Bigtable Beam-Connector (BigtableIO) in Verbindung mit dem Cloud Bigtable-Client für Java verwenden, der mit den Cloud Bigtable APIs funktioniert. Eine Einführung in diesen Connector finden Sie unter Bigtable Beam-Connector.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Erste Schritte mit HBase

Der Bigtable HBase Beam-Connector ist in Java geschrieben und wurde auf der Bigtable-Tabelle HBase-Client für Java Er ist mit dem Dataflow SDK 2.x für Java kompatibel, das auf Apache Beam basiert. Den Quellcode des Connectors finden Sie auf GitHub im Repository googleapis/java-bigtable-hbase.

Diese Seite bietet einen Überblick über die Verwendung von Read- und Write-Transformationen.

Authentifizierung einrichten

Wenn Sie die Java Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Weitere Informationen unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Connector zu einem Maven-Projekt hinzufügen

Um den Bigtable HBase Beam-Connector einem Maven-Projekt hinzuzufügen, fügen Sie das Maven-Artefakt in Ihre pom.xml-Datei als Abhängigkeit:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Bigtable-Konfiguration angeben

Erstellen Sie eine Schnittstelle für Optionen, um Eingaben für die Ausführung der Pipeline zuzulassen:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Wenn Sie aus Bigtable lesen oder in Bigtable schreiben, müssen Sie ein CloudBigtableConfiguration-Konfigurationsobjekt angeben. Dieses Objekt legt die Projekt-ID und die Instanz-ID für Ihre Tabelle sowie den Namen der Tabelle fest:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Stellen Sie für Lesevorgänge ein CloudBigtableScanConfiguration-Konfigurationsobjekt bereit, mit dem Sie ein Scan-Objekt von Apache HBase angeben können, das die Ergebnisse eines Lesevorgangs begrenzt und filtert. Weitere Informationen finden Sie unter Daten aus Bigtable lesen.

Aus Bigtable lesen

Zum Lesen aus einer Tabelle in Bigtable wenden Sie eine Read-Transformation auf das Ergebnis eines CloudBigtableIO.read-Vorgangs an. Die Read-Transformation gibt eine PCollection von HBase-Result-Objekten zurück, wobei jedes Element in PCollection eine einzelne Zeile in der Tabelle darstellt.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Standardmäßig gibt ein CloudBigtableIO.read-Vorgang alle Zeilen in Ihrer Tabelle zurück. Sie können ein HBase-Scan-Objekt verwenden, um den Lesevorgang auf einen Bereich von Zeilenschlüsseln in Ihrer Tabelle zu beschränken oder Filter auf die Ergebnisse des Lesevorgangs anzuwenden. Wenn Sie ein Scan-Objekt verwenden möchten, fügen Sie es in CloudBigtableScanConfiguration ein.

Sie können beispielsweise ein Scan-Objekt einfügen, das nur das erste Schlüsselwertpaar aus jeder Zeile in der Tabelle zurückgibt. Dies ist zum Zählen der Zeilen in der Tabelle nützlich:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

In Bigtable schreiben

Für den Schreibzugriff auf eine Bigtable-Tabelle wenden Sie mit apply einen CloudBigtableIO.writeToTable-Vorgang an. Sie müssen diesen Vorgang auf eine PCollection von HBase-Mutation-Objekten anwenden, die sowohl Put- als auch Delete-Objekte enthalten kann.

Die Bigtable-Tabelle muss bereits vorhanden sein und die entsprechenden Spaltenfamilien müssen definiert sein. Der Dataflow-Connector erstellt Tabellen und Spaltenfamilien nicht direkt. Sie können die cbt-Befehlszeile verwenden, um eine Tabelle zu erstellen und Spaltenfamilien einzurichten. Dies ist aber auch programmatisch möglich.

Bevor Sie in Bigtable schreiben, erstellen und initialisieren Sie Ihre Dataflow-Pipeline, sodass Puts und Deletes über das Netzwerk serialisiert werden können:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Im Allgemeinen müssen Sie eine Transformation wie ParDo ausführen, um Ihre Ausgabedaten in eine Sammlung von HBase-Put- oder -Delete-Objekten zu formatieren. Das folgende Beispiel zeigt eine DoFn-Transformation, die den aktuellen Wert als Zeilenschlüssel für ein Put-Objekt verwendet. Sie können die Put-Objekte dann in Bigtable schreiben.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Wenn Sie die Steuerung des Batch-Schreibvorgangs aktivieren möchten, setzen Sie BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL auf true. Diese Funktion begrenzt den Traffic für Batch-Schreibanfragen automatisch und lässt Bigtable-Autoscaling, um Knoten automatisch hinzuzufügen oder zu entfernen, Ihren Dataflow-Job.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Hier ist das vollständige Beispiel für den Schreibvorgang, einschließlich der Variante, die die Ablaufsteuerung für Batch-Schreiben ermöglicht.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}