Spanner-Connector mit Spark verwenden

Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster erstellen, in dem mithilfe des Spark Spanner-Connectors Daten mit Apache Spark aus Spanner gelesen werden.

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

Bevor Sie den Spanner-Connector in dieser Anleitung verwenden, müssen Sie einen Dataproc-Cluster sowie eine Spanner-Instanz und ‑Datenbank einrichten.

Dataproc-Cluster einrichten

Erstellen Sie einen Dataproc-Cluster oder verwenden Sie einen vorhandenen Dataproc-Cluster mit den folgenden Einstellungen:

  • Berechtigungen für das VM-Dienstkonto. Dem VM-Dienstkonto des Clusters müssen die entsprechenden Spanner-Berechtigungen zugewiesen werden. Wenn Sie Data Boost verwenden (Data Boost ist im Beispielcode unter Daten aus Spanner lesen aktiviert), muss das VM-Dienstkonto auch die erforderlichen IAM-Berechtigungen für Data Boost haben.

  • Zugriffsbereich Der Cluster muss mit aktiviertem cloud-platform- oder dem entsprechenden spanner-Bereich erstellt werden. Der Bereich cloud-platform ist standardmäßig für Cluster aktiviert, die mit Image-Version 2.1 oder höher erstellt wurden.

    In der folgenden Anleitung erfahren Sie, wie Sie den cloud-platform-Umfang als Teil einer Clustererstellungsanfrage mit der Google Cloud Console, der gcloud CLI oder der Dataproc API festlegen. Eine weitere Anleitung zum Erstellen von Clustern finden Sie unter Cluster erstellen.

    Google Cloud Console

    1. Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen.
    2. Klicken Sie auf dem Steuerfeld Sicherheit verwalten im Bereich Projektzugriff auf „Aktiviert den Bereich ‚cloud-platform‘ für diesen Cluster“.
    3. Füllen Sie die anderen Felder für die Clustererstellung aus oder bestätigen Sie sie. Klicken Sie dann auf Erstellen.

    gcloud-CLI

    Mit dem folgenden gcloud dataproc clusters create-Befehl können Sie einen Cluster mit aktiviertem cloud-platform-Bereich erstellen.

    gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

    API

    Sie können GceClusterConfig.serviceAccountScopes als Teil einer clusters.create-Anfrage angeben.

        "serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
    

Spanner-Instanz mit einer Singers-Datenbanktabelle einrichten

Erstellen Sie eine Spanner-Instanz mit einer Datenbank, die eine Singers-Tabelle enthält. Notieren Sie sich die Spanner-Instanz-ID und die Datenbank-ID.

Spanner-Connector mit Spark verwenden

Der Spanner-Connector ist für Spark-Versionen 3.1+ verfügbar. Sie geben die Connectorversion als Teil der JAR-Dateibeschreibung des Cloud Storage-Connectors an, wenn Sie einen Job an einen Dataproc-Cluster senden.

Beispiel:Einreichen eines Spark-Jobs über die gcloud CLI mit dem Spanner-Connector

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Ersetzen Sie Folgendes:

CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

Daten aus Spanner lesen

Sie können mit Python oder Scala Daten mithilfe der Spark-Datenquellen-API aus Spanner in einen Spark-DataFrame lesen.

PySpark

Sie können den Beispiel-PySpark-Code in diesem Abschnitt in Ihrem Cluster ausführen, indem Sie den Job an den Dataproc-Dienst senden oder ihn über die spark-submit-REPL auf dem Clustermasterknoten ausführen.

Dataproc-Job

  1. Erstellen Sie eine singers.py-Datei mit einem lokalen Texteditor oder in Cloud Shell mit dem vorinstallierten Texteditor vi, vim oder nano.
    1. Fügen Sie den folgenden Code in die Datei singers.py ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.py.
  2. Senden Sie den Job über die Google Cloud Console, die gcloud CLI oder die Dataproc API an den Dataproc-Dienst.

    Beispiel:Job mit der gcloud CLI über den Spanner-Connector einreichen

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Ersetzen Sie Folgendes:

    1. CLUSTER_NAME: Der Name des neuen Clusters.
    2. REGION: Eine verfügbare Compute Engine-Region zum Ausführen der Arbeitslast.
    3. CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

spark-submit-Job

  1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
    1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
    2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
      Seite mit den Details zum Dataproc-Cluster in der Cloud Console

      Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Erstellen Sie mit dem vorinstallierten Texteditor vi, vim oder nano eine singers.py-Datei auf dem Masterknoten.
    1. Fügen Sie den folgenden Code in die Datei singers.py ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.py.
  3. Führen Sie singers.py mit spark-submit aus, um die Spanner-Tabelle Singers zu erstellen.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Ersetzen Sie Folgendes:

    1. CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

    Die Ausgabe sieht so aus:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

So führen Sie den Beispiel-Scala-Code auf Ihrem Cluster aus:

  1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
    1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
    2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
      Seite mit den Details zum Dataproc-Cluster in der Cloud Console

      Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Erstellen Sie mit dem vorinstallierten Texteditor vi, vim oder nano eine singers.scala-Datei auf dem Masterknoten.
    1. Fügen Sie den folgenden Code in die Datei singers.scala ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.scala.
  3. Starten Sie die spark-shell-REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Ersetzen Sie Folgendes:

    CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

  4. Führen Sie singers.scala mit dem Befehl :load singers.scala aus, um die Spanner-Singers-Tabelle zu erstellen. Die Ausgabeliste enthält Beispiele aus der Ausgabe von „Sänger“.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Bereinigen

Um laufende Kosten für Ihr Google Cloud Konto zu vermeiden, können Sie Ihren Dataproc-Cluster anhalten oder löschen und Ihre Spanner-Instanz löschen.

Weitere Informationen