Dataflow-Connector für Bigtable

Mit dem Cloud Dataflow-Connector für Bigtable kann Bigtable in einer Cloud Dataflow-Pipeline verwendet werden. Sie können den Connector sowohl für Batch- als auch für Streaming-Vorgänge verwenden.

Der Connector ist in Java geschrieben und baut auf dem Bigtable-HBase-Client für Java auf. Er ist mit dem Dataflow SDK 2.x für Java kompatibel, das auf Apache Beam basiert. Den Quellcode des Connectors finden Sie auf GitHub im Repository googleapis/java-bigtable-hbase.

Diese Seite bietet einen Überblick über die Verwendung von Read- und Write-Transformationen mit dem Cloud Dataflow-Connector. Alternativ können Sie auch die vollständige API-Dokumentation für den Cloud Dataflow-Connector lesen.

Connector einem Maven-Projekt hinzufügen

Wenn Sie den Cloud Dataflow-Connector in ein Maven-Projekt einbinden möchten, fügen Sie das Maven-Artefakt als Abhängigkeit in die pom.xml-Datei ein:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>1.20.0</version>
</dependency>

Bigtable-Konfiguration angeben

Erstellen Sie eine Schnittstelle für Optionen, um Eingaben für die Ausführung der Pipeline zuzulassen:

public interface BigtableOptions extends DataflowPipelineOptions {
  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("bigtable-table")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Wenn Sie aus Bigtable lesen oder in Bigtable schreiben, müssen Sie ein CloudBigtableConfiguration-Konfigurationsobjekt angeben. Dieses Objekt legt die Projekt-ID und die Instanz-ID für Ihre Tabelle sowie den Namen der Tabelle fest:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Stellen Sie für Lesevorgänge ein CloudBigtableScanConfiguration-Konfigurationsobjekt bereit, mit dem Sie ein Scan-Objekt von Apache HBase angeben können, das die Ergebnisse eines Lesevorgangs begrenzt und filtert. Weitere Informationen finden Sie unter Daten aus Bigtable lesen.

Aus Bigtable lesen

Zum Lesen aus einer Tabelle in Bigtable wenden Sie eine Read-Transformation auf das Ergebnis eines CloudBigtableIO.read-Vorgangs an. Die Read-Transformation gibt eine PCollection von HBase-Result-Objekten zurück, wobei jedes Element in PCollection eine einzelne Zeile in der Tabelle darstellt.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

Standardmäßig gibt ein CloudBigtableIO.read-Vorgang alle Zeilen in Ihrer Tabelle zurück. Sie können ein HBase-Scan-Objekt verwenden, um den Lesevorgang auf einen Bereich von Zeilenschlüsseln in Ihrer Tabelle zu beschränken oder Filter auf die Ergebnisse des Lesevorgangs anzuwenden. Wenn Sie ein Scan-Objekt verwenden möchten, fügen Sie es in CloudBigtableScanConfiguration ein.

Sie können beispielsweise ein Scan-Objekt einfügen, das nur das erste Schlüsselwertpaar aus jeder Zeile in der Tabelle zurückgibt. Dies ist zum Zählen der Zeilen in der Tabelle nützlich:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

In Bigtable schreiben

Für den Schreibzugriff auf eine Bigtable-Tabelle wenden Sie mit apply einen CloudBigtableIO.writeToTable-Vorgang an. Sie müssen diesen Vorgang auf eine PCollection von HBase-Mutation-Objekten anwenden, die sowohl Put- als auch Delete-Objekte enthalten kann.

Die Bigtable-Tabelle muss bereits vorhanden sein und die entsprechenden Spaltenfamilien müssen definiert sein. Der Dataflow-Connector erstellt Tabellen und Spaltenfamilien nicht direkt. Sie können das cbt-Befehlszeilentool verwenden, um eine Tabelle zu erstellen und Spaltenfamilien einzurichten. Dies ist aber auch programmatisch möglich.

Bevor Sie in Bigtable schreiben, erstellen und initialisieren Sie Ihre Cloud Dataflow-Pipeline, sodass Puts und Deletes über das Netzwerk serialisiert werden können.

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Im Allgemeinen müssen Sie eine Transformation wie ParDo ausführen, um Ihre Ausgabedaten in eine Sammlung von HBase-Put- oder -Delete-Objekten zu formatieren. Das folgende Beispiel zeigt eine einfache DoFn-Transformation, die den aktuellen Wert als Zeilenschlüssel für ein Put-Objekt verwendet. Sie können die Put-Objekte dann in Bigtable schreiben.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Hier ist das vollständige Beispiel für den Schreibvorgang.

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("bigtable-table")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}