Bigtable Spark-Connector verwenden

Mit dem Bigtable Spark-Connector können Sie Daten in Bigtable lesen und schreiben. Mit Spark SQL und DataFrames können Sie Daten aus Ihrer Spark-Anwendung lesen. Die folgenden Bigtable-Vorgänge werden mit dem Bigtable Spark-Connector unterstützt:

  • Daten schreiben
  • Daten lesen
  • Neue Tabelle erstellen

In diesem Dokument erfahren Sie, wie Sie eine Spark SQL-DataFrames-Tabelle in eine Bigtable-Tabelle konvertieren und dann eine JAR-Datei kompilieren und erstellen, um einen Spark-Job zu senden.

Supportstatus für Spark und Scala

Der Bigtable Spark-Connector unterstützt nur die Version Scala 2.12 und die folgenden Spark-Versionen:

Der Bigtable Spark-Connector unterstützt die folgenden Dataproc-Versionen:

Kosten berechnen

Wenn Sie sich für die Verwendung einer der folgenden kostenpflichtigen Komponenten von Google Cloud entscheiden, werden Ihnen die genutzten Ressourcen in Rechnung gestellt:

  • Bigtable (für die Nutzung des Bigtable-Emulators fallen keine Kosten an)
  • Dataproc
  • Cloud Storage

Dataproc-Preise gelten für die Nutzung von Dataproc in Compute Engine-Clustern. Dataproc Serverless-Preise gelten für Arbeitslasten und Sitzungen, die auf Dataproc Serverless for Spark ausgeführt werden.

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.

Hinweise

Damit Sie den Bigtable Spark-Connector verwenden können, müssen Sie zuerst die folgenden Schritte ausführen.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zu gewähren, um die erforderlichen Berechtigungen zur Verwendung des Bigtable Spark-Connectors zu erhalten:

  • Bigtable-Administrator (roles/bigtable.admin)(optional): Ermöglicht das Lesen oder Schreiben von Daten und das Erstellen einer neuen Tabelle.
  • Bigtable-Nutzer (roles/bigtable.user): Ermöglicht das Lesen oder Schreiben von Daten, aber nicht das Erstellen einer neuen Tabelle.

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Möglicherweise können Sie die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Wenn Sie Dataproc oder Cloud Storage verwenden, sind möglicherweise zusätzliche Berechtigungen erforderlich. Weitere Informationen finden Sie unter Dataproc-Berechtigungen und Cloud Storage-Berechtigungen.

Spark einrichten

Neben dem Erstellen einer Bigtable-Instanz müssen Sie auch Ihre Spark-Instanz einrichten. Sie können dies lokal tun oder eine der folgenden Optionen auswählen, um Spark mit Dataproc zu verwenden:

  • Dataproc-Cluster
  • Dataproc Serverless

Weitere Informationen zur Auswahl zwischen einem Dataproc-Cluster und einer serverlosen Option finden Sie in der Dokumentation Dataproc Serverless for Spark im Vergleich zu Dataproc in Compute Engine .

Connector-JAR-Datei herunterladen

Den Quellcode des Bigtable Spark-Connectors mit Beispielen finden Sie im GitHub-Repository des Bigtable Spark-Connectors.

Je nach Spark-Setup können Sie wie folgt auf die JAR-Datei zugreifen:

  • Wenn Sie PySpark lokal ausführen, sollten Sie die JAR-Datei des Connectors aus dem Cloud Storage-Speicherort gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar herunterladen.

    Ersetzen Sie SCALA_VERSION durch die Scala-Version, 2.12 als einzige unterstützte Version und CONNECTOR_VERSION durch die Connector-Version, die Sie verwenden möchten.

  • Verwenden Sie für Dataproc-Cluster oder serverlose Option die neueste JAR-Datei als Artefakt, das in Ihre Scala- oder Java Spark-Anwendungen hinzugefügt werden kann. Weitere Informationen zur Verwendung der JAR-Datei als Artefakt finden Sie unter Abhängigkeiten verwalten.

  • Wenn Sie den PySpark-Job an Dataproc senden, verwenden Sie das Flag gcloud dataproc jobs submit pyspark --jars, um den URI auf den Speicherort der JAR-Datei in Cloud Storage festzulegen, z. B. gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Bigtable-Konfiguration zur Spark-Anwendung hinzufügen

Fügen Sie Ihrer Spark-Anwendung die Spark-Optionen hinzu, über die Sie mit Bigtable interagieren können.

Unterstützte Spark-Optionen

Verwenden Sie die Spark-Optionen, die im Paket com.google.cloud.spark.bigtable verfügbar sind.

Optionsname Erforderlich Standardwert Bedeutung
spark.bigtable.project.id Ja Legen Sie die Bigtable-Projekt-ID fest.
spark.bigtable.instance.id Ja Legen Sie die Bigtable-Instanz-ID fest.
catalog Ja Legen Sie das JSON-Format fest, das das Konvertierungsformat zwischen dem SQL-ähnlichen Schema des DataFrames und dem Schema der Bigtable-Tabelle angibt.

Weitere Informationen finden Sie unter Tabellenmetadaten im JSON-Format erstellen.
spark.bigtable.app_profile.id Nein default Legen Sie die Bigtable-Anwendungsprofil-ID fest.
spark.bigtable.write.timestamp.milliseconds Nein Aktuelle Systemzeit Legen Sie den Zeitstempel in Millisekunden fest, der beim Schreiben eines DataFrames in Bigtable verwendet werden soll.

Da alle Zeilen im DataFrame denselben Zeitstempel verwenden, bleiben Zeilen mit derselben Zeilenschlüsselspalte darin als einzelne Version in Bigtable bestehen, da sie denselben Zeitstempel haben.
spark.bigtable.create.new.table Nein false Legen Sie true fest, um vor dem Schreiben in Bigtable eine neue Tabelle zu erstellen.
spark.bigtable.read.timerange.start.milliseconds oder spark.bigtable.read.timerange.end.milliseconds Nein Legen Sie Zeitstempel (in Millisekunden seit Epochenzeit) fest, um Zellen mit einem bestimmten Start- bzw. Enddatum zu filtern. Beide oder keiner dieser Parameter müssen angegeben werden.
spark.bigtable.push.down.row.key.filters Nein true Legen Sie true fest, um das einfache Filtern von Zeilenschlüsseln auf der Serverseite zuzulassen. Das Filtern nach zusammengesetzten Zeilenschlüsseln wird clientseitig implementiert.

Weitere Informationen finden Sie unter Bestimmte DataFrame-Zeile mit einem Filter lesen.
spark.bigtable.read.rows.attempt.timeout.milliseconds Nein 30 m Legen Sie die timeout-Dauer für einen Leseversuch fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht.
spark.bigtable.read.rows.total.timeout.milliseconds Nein 12 Std. Legen Sie das gesamte Zeitlimit für einen Leseversuch fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds Nein 1m Legen Sie die timeout-Dauer für mutate-Zeilen fest, die einer DataFrame-Partition im Bigtable-Client für Java entsprechen.
spark.bigtable.mutate.rows.total.timeout.milliseconds Nein 10 Min. Legen Sie das Zeitlimit für einen Änderungsversuch von Zeilen fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht.
spark.bigtable.batch.mutate.size Nein 100 Legen Sie als Wert die Anzahl der Mutationen in jedem Batch fest. Der Höchstwert, den Sie festlegen können, ist 100000.
spark.bigtable.enable.batch_mutate.flow_control Nein false Legen Sie true fest, um die Ablaufsteuerung für Batchmutationen zu aktivieren.

Tabellenmetadaten im JSON-Format erstellen

Das Format der Spark SQL DataFrames-Tabelle muss mithilfe eines Strings im JSON-Format in eine Bigtable-Tabelle konvertiert werden. Dieses String-JSON-Format macht das Datenformat mit Bigtable kompatibel. Sie können das JSON-Format mit der Option .option("catalog", catalog_json_string) in Ihren Anwendungscode übergeben.

Betrachten Sie als Beispiel die folgende DataFrame-Tabelle und die entsprechende Bigtable-Tabelle.

In diesem Beispiel sind die Spalten name und birthYear im DataFrame in der Spaltenfamilie info gruppiert und in name bzw. birth_year umbenannt. In ähnlicher Weise wird die Spalte address unter der Spaltenfamilie location mit demselben Spaltennamen gespeichert. Die Spalte id aus dem DataFrame wird in den Bigtable-Zeilenschlüssel konvertiert.

Die Zeilenschlüssel haben in Bigtable keinen eigenen Spaltennamen. In diesem Beispiel wird id_rowkey nur verwendet, um dem Connector anzuzeigen, dass dies die Zeilenschlüsselspalte ist. Sie können einen beliebigen Namen für die Zeilenschlüsselspalte verwenden. Achten Sie darauf, denselben Namen zu nutzen, wenn Sie das Feld "rowkey":"column_name" im JSON-Format deklarieren.

DataFrame Bigtable-Tabelle = t1
Spalten Zeilenschlüssel Spaltenfamilien
Info Ort
Spalten Spalten
id name birthYear Adresse id_rowkey name birth_year Adresse

Das JSON-Format für den Katalog sieht so aus:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

Im JSON-Format werden folgende Schlüssel und Werte verwendet:

Katalogschlüssel Katalogwert JSON-Format
table Name der Bigtable-Tabelle. "table":{"name":"t1"}

Wenn die Tabelle nicht vorhanden ist, verwenden Sie .option("spark.bigtable.create.new.table", "true"), um sie zu erstellen.
Rowkey Name der Spalte, die als Bigtable-Zeilenschlüssel verwendet wird. Der Spaltenname der DataFrame-Spalte muss als Zeilenschlüssel verwendet werden, z. B. id_rowkey.

Zusammengesetzte Schlüssel werden auch als Zeilenschlüssel akzeptiert. Beispiel: "rowkey":"name:address" Dieser Ansatz kann zu Zeilenschlüsseln führen, die einen vollständigen Tabellenscan für alle Leseanfragen erfordern.
"rowkey":"id_rowkey",
Spalten Zuordnung jeder DataFrame-Spalte zu den entsprechenden Bigtable-Spaltenfamilien ("cf") und Spaltennamen ("col"). Der Spaltenname kann sich vom Spaltennamen in der DataFrame-Tabelle unterscheiden. Zu den unterstützten Datentypen gehören string, long und binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

In diesem Beispiel ist id_rowkey der Zeilenschlüssel und info und location sind die Spaltenfamilien.

Unterstützte Datentypen

Der Connector unterstützt die Verwendung der Typen string, long und binary (Byte-Array) im Katalog. Bis andere Typen wie int und float unterstützt werden, können Sie diese Datentypen manuell in Byte-Arrays (BinaryType von Spark SQL) konvertieren, bevor Sie sie mit dem Connector in Bigtable schreiben.

Darüber hinaus können Sie mit Avro komplexe Typen wie ArrayType serialisieren. Weitere Informationen finden Sie unter Komplexe Datentypen mit Apache Avro serialisieren.

In Bigtable schreiben

Verwenden Sie die Funktion .write() und die unterstützten Optionen, um Ihre Daten in Bigtable zu schreiben.

Java

Der folgende Code aus dem GitHub-Repository verwendet Java und Maven, um in Bigtable zu schreiben.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");

…

  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

Der folgende Code aus dem GitHub-Repository verwendet Python, um in Bigtable zu schreiben.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  …

Aus Bigtable lesen

Prüfen Sie mit der Funktion .read(), ob die Tabelle erfolgreich in Bigtable importiert wurde.

Java

  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  …

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Projekt kompilieren

Generieren Sie die JAR-Datei, die zum Ausführen eines Jobs in einem Dataproc-Cluster, einer serverlosen Dataproc-Instanz oder einer lokalen Spark-Instanz verwendet wird. Sie können die JAR-Datei lokal kompilieren und dann zum Senden eines Jobs verwenden. Der Pfad zur kompilierten JAR-Datei wird beim Senden eines Jobs als Umgebungsvariable PATH_TO_COMPILED_JAR festgelegt.

Dieser Schritt gilt nicht für PySpark-Anwendungen.

Abhängigkeiten verwalten

Der Bigtable Spark-Connector unterstützt die folgenden Tools für die Abhängigkeitsverwaltung:

Kompilieren Sie die JAR-Datei.

Maven

  1. Fügen Sie die Abhängigkeit spark-bigtable zu Ihrer pom.xml-Datei hinzu.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Fügen Sie der Datei pom.xml das Maven-Shade-Plug-in hinzu, um eine Uber-JAR-Datei zu erstellen:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Führen Sie den Befehl mvn clean install aus, um eine JAR-Datei zu generieren.

sbt

  1. Fügen Sie der Datei build.sbt die Abhängigkeit spark-bigtable hinzu:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. Füge das sbt-assembly-Plug-in zu deiner project/plugins.sbt- oder project/assembly.sbt-Datei hinzu, um eine Uber-JAR-Datei zu erstellen.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. Führen Sie den Befehl sbt clean assembly aus, um die JAR-Datei zu generieren.

Gradle

  1. Fügen Sie der Datei build.gradle die Abhängigkeit spark-bigtable hinzu.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. Fügen Sie der Datei build.gradle das Plug-in Shadow hinzu, um eine Uber-JAR-Datei zu erstellen:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. Weitere Informationen zur Konfiguration und zur JAR-Kompilierung finden Sie in der Dokumentation des Shadow-Plug-ins.

Job senden

Senden Sie einen Spark-Job mit Dataproc, Dataproc Serverless oder einer lokalen Spark-Instanz, um Ihre Anwendung zu starten.

Laufzeitumgebung festlegen

Legen Sie die folgenden Umgebungsvariablen fest:

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Ersetzen Sie Folgendes:

  • PROJECT_ID: Die permanente Kennzeichnung des Bigtable-Projekts.
  • INSTANCE_ID: Die permanente Kennzeichnung der Bigtable-Instanz.
  • TABLE_NAME: Die permanente Kennzeichnung der Tabelle.
  • DATAPROC_CLUSTER: Die permanente Kennzeichnung des Dataproc-Clusters.
  • DATAPROC_REGION: Die Dataproc-Region, die einen der Cluster in Ihrer Dataproc-Instanz enthält, z. B. northamerica-northeast2.
  • DATAPROC_ZONE: Die Zone, in der der Dataproc-Cluster ausgeführt wird.
  • SUBNET: Der vollständige Ressourcenpfad des Subnetzes.
  • GCS_BUCKET_NAME: Der Cloud Storage-Bucket zum Hochladen der Abhängigkeiten von Spark-Arbeitslasten.
  • PATH_TO_COMPILED_JAR: Der vollständige oder relative Pfad zur kompilierten JAR-Datei, z. B. /path/to/project/root/target/<compiled_JAR_name> für Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: Der Cloud Storage-Bucket gs://spark-lib/bigtable, in dem sich die Datei spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar befindet.
  • PATH_TO_PYTHON_FILE: Bei PySpark-Anwendungen der Pfad zur Python-Datei, die zum Schreiben von Daten in Bigtable und zum Lesen von Daten aus Bigtable verwendet wird.
  • LOCAL_PATH_TO_CONNECTOR_JAR: bei PySpark-Anwendungen den Pfad zur heruntergeladenen JAR-Datei des Bigtable-Spark-Connectors.

Spark-Job senden

Führen Sie für Dataproc-Instanzen oder Ihre lokale Spark-Einrichtung einen Spark-Job aus, um Daten in Bigtable hochzuladen.

Dataproc-Cluster

Verwenden Sie die kompilierte JAR-Datei und erstellen Sie einen Dataproc-Clusterjob, der Daten in Bigtable liest und schreibt.

  1. Erstellen Sie einen Dataproc-Cluster. Das folgende Beispiel zeigt einen Beispielbefehl zum Erstellen eines Dataproc v2.0-Clusters mit Debian 10, zwei Worker-Knoten und Standardkonfigurationen.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Senden Sie einen Job.

    Scala/Java

    Das folgende Beispiel zeigt die Klasse spark.bigtable.example.WordCount, die die Logik zum Erstellen einer Testtabelle in DataFrame, zum Schreiben der Tabelle in Bigtable und zum Zählen der Wörter in der Tabelle enthält.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

Verwenden Sie die kompilierte JAR-Datei und erstellen Sie einen Dataproc-Job, der mit einer Dataproc Serverless-Instanz Daten in Bigtable liest und schreibt.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Lokales Spark

Verwenden Sie die heruntergeladene JAR-Datei und erstellen Sie einen Spark-Job, der Daten mit einer lokalen Spark-Instanz in Bigtable liest und schreibt. Sie können auch den Bigtable-Emulator verwenden, um den Spark-Job zu senden.

Bigtable-Emulator verwenden

Wenn Sie sich für den Bigtable-Emulator entscheiden, gehen Sie so vor:

  1. Verwenden Sie den folgenden Befehl, um den Emulator zu starten:

    gcloud beta emulators bigtable start
    

    Standardmäßig wählt der Emulator localhost:8086 aus.

  2. Legen Sie die Umgebungsvariable BIGTABLE_EMULATOR_HOST fest:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Senden Sie den Spark-Job.

Weitere Informationen zur Verwendung des Bigtable-Emulators finden Sie unter Mit dem Emulator testen.

Spark-Job senden

Verwenden Sie den Befehl spark-submit, um einen Spark-Job unabhängig davon zu senden, ob Sie einen lokalen Bigtable-Emulator verwenden.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Tabellendaten prüfen

Führen Sie den folgenden cbt-Befehl in der Befehlszeile aus, um zu prüfen, ob die Daten in Bigtable geschrieben werden. Die cbt-Befehlszeile ist eine Komponente der Google Cloud CLI. Weitere Informationen finden Sie in der Übersicht zur cbt-Befehlszeile.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Zusätzliche Lösungen

Verwenden Sie den Bigtable Spark-Connector für bestimmte Lösungen, z. B. zum Serialisieren komplexer Spark SQL-Typen, zum Lesen bestimmter Zeilen und zum Generieren clientseitiger Messwerte.

Bestimmte DataFrame-Zeile mit einem Filter lesen

Wenn Sie DataFrames zum Lesen aus Bigtable verwenden, können Sie einen Filter angeben, der nur bestimmte Zeilen liest. Einfache Filter wie ==, <= und startsWith auf die Zeilenschlüsselspalte werden serverseitig angewendet, um einen Scan der gesamten Tabelle zu vermeiden. Filter für zusammengesetzte Zeilenschlüssel oder komplexe Filter wie der LIKE-Filter für die Zeilenschlüsselspalte werden clientseitig angewendet.

Wenn Sie große Tabellen lesen, empfehlen wir die Verwendung einfacher Zeilenschlüsselfilter, um einen Scan der gesamten Tabelle zu vermeiden. Die folgende Beispielanweisung zeigt, wie mit einem einfachen Filter gelesen wird. Achten Sie darauf, dass Sie im Spark-Filter den Namen der DataFrame-Spalte verwenden, die in den Zeilenschlüssel konvertiert wird:

    dataframe.filter("id == 'some_id'").show()
  

Wenn Sie einen Filter anwenden, verwenden Sie den Namen der DataFrame-Spalte anstelle des Bigtable-Tabellenspaltennamens.

Komplexe Datentypen mit Apache Avro serialisieren

Der Bigtable Spark-Connector unterstützt die Verwendung von Apache Avro, um komplexe Spark SQL-Typen wie ArrayType, MapType oder StructType zu serialisieren. Apache Avro bietet Datenserialisierung für Datensatzdaten, die häufig für die Verarbeitung und Speicherung komplexer Datenstrukturen verwendet wird.

Verwenden Sie eine Syntax wie "avro":"avroSchema", um anzugeben, dass eine Spalte in Bigtable mit Avro codiert werden soll. Sie können dann beim Lesen aus oder Schreiben in Bigtable .option("avroSchema", avroSchemaString) verwenden, um das zu dieser Spalte gehörende Avro-Schema im Stringformat anzugeben. Sie können verschiedene Optionsnamen verwenden, z. B. "anotherAvroSchema" für verschiedene Spalten und Avro-Schemas für mehrere Spalten übergeben.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Clientseitige Messwerte verwenden

Da der Bigtable Spark-Connector auf dem Bigtable-Client für Java basiert, sind clientseitige Messwerte standardmäßig innerhalb des Connectors aktiviert. Weitere Informationen zum Aufrufen und Interpretieren dieser Messwerte finden Sie in der Dokumentation zu clientseitigen Messwerten.

Bigtable-Client für Java mit Low-Level-RDD-Funktionen verwenden

Da der Bigtable Spark-Connector auf dem Bigtable-Client für Java basiert, können Sie den Client direkt in Ihren Spark-Anwendungen verwenden und verteilte Lese- oder Schreibanfragen innerhalb der Low-Level-RDD-Funktionen wie mapPartitions und foreachPartition ausführen.

Wenn Sie den Bigtable-Client für Java-Klassen verwenden möchten, hängen Sie das Präfix com.google.cloud.spark.bigtable.repackaged an die Paketnamen an. Verwenden Sie beispielsweise com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient anstelle des Klassennamens als com.google.cloud.bigtable.data.v2.BigtableDataClient.

Weitere Informationen zum Bigtable-Client für Java finden Sie unter Bigtable-Client für Java.

Nächste Schritte