Spanner-Connector mit Spark verwenden

Auf dieser Seite erfahren Sie, wie Sie mit dem Spark Spanner Connector Daten mit Apache Spark aus Spanner lesen.

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

Bevor Sie die Anleitung ausführen, müssen Sie die Connector-Version kennen und rufen Sie einen Connector-URI ab.

URI der JAR-Datei des Connectors angeben

Versionen des Spark Spanner-Connectors sind in GitHub aufgeführt GoogleCloudDataproc/spark-spanner-connector-Repository.

Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Connectorversion im folgenden URI-String ersetzen:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Der Connector ist für die Spark-Versionen 3.1+ verfügbar

Beispiel für die gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Spanner-Datenbank vorbereiten

Wenn Sie keine Spanner-Tabelle haben, können Sie der Anleitung zum Erstellen eines Spanner-Tabelle. Danach haben Sie eine Instanz-ID, eine Datenbank-ID und eine Tabelle Singers.

Dataproc-Cluster erstellen

Jeder Dataproc-Cluster, der den Connector verwendet, benötigt die Bereiche spanner oder cloud-platform. Dataproc-Cluster haben den Standardbereich cloud-platform für Image 2.1 oder höher. Wenn Sie eine ältere Version verwenden, können Sie die Google Cloud Console, die Google Cloud CLI und die Dataproc API verwenden, um einen Dataproc-Cluster zu erstellen.

Console

  1. Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen.
  2. Klicken Sie auf dem Tab „Sicherheit verwalten“ im Bereich „Projektzugriff“ auf „Aktiviert den Bereich ‚cloud-platform‘ für diesen Cluster“.
  3. Füllen Sie die anderen Felder für die Clustererstellung aus oder bestätigen Sie sie. klicken Sie auf „Erstellen“.

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Sie können GceClusterConfig.serviceAccountScopes angeben im Rahmen einer clusters.create-Anfrage. Beispiel:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Sie müssen dem Dataproc-VM-Dienstkonto die entsprechende Spanner-Berechtigung zuweisen. Wenn Sie in der Anleitung Data Boost verwenden, lesen Sie den Hilfeartikel IAM-Berechtigung für Data Boost.

Daten aus Spanner lesen

Mit Scala und Python können Sie mithilfe der Spark-Datenquellen-API Daten aus Spanner in einen Spark-DataFrame lesen.

Scala

  1. Sehen Sie sich den Code an und ersetzen Sie die Platzhalter [projectId], [instanceId], [databaseId] und [table] durch Projekt-ID, Instanz-ID, Datenbank-ID und Tabelle, die Sie zuvor erstellt haben. Die Option „enableDataBoost“ aktiviert die Spanner-Funktion Data Boost, mit der Auswirkungen auf die Spanner-Hauptinstanz nahezu null.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Code in einem Cluster ausführen
    1. SSH verwenden, um eine Verbindung zum Masterknoten des Dataproc-Clusters herzustellen
      1. Gehen Sie zur Dataproc-Cluster Seite in der Google Cloud Console und klicken Sie auf den Namen Ihres Clusters
        Die Seite mit den Dataproc-Clustern in der Cloud Console.
      2. Wählen Sie auf der Seite >Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann auf SSH rechts neben dem Namen des Cluster-Masterknotens <ph type="x-smartling-placeholder">
        </ph> Detailseite des Dataproc-Clusters in der Cloud Console

        Im Stammverzeichnis des Master-Knotens wird ein Browserfenster geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie singers.scala mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den Scala-Code aus der Scala-Code-Liste ein.
      nano singers.scala
        
    3. Starten Sie die spark-shell-REPL.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Führen Sie singers.scala mit dem Befehl :load singers.scala aus, um die Spanner-Singers-Tabelle zu erstellen. Die Ausgabeliste zeigt Beispiele aus der Ausgabe von „Sänger“ an.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Prüfen Sie den Code und ersetzen Sie die Platzhalter [projectId], [instanceId], [databaseId] und [table] durch die Projekt-ID, die Instanz-ID, die Datenbank-ID und die Tabelle, die Sie zuvor erstellt haben. Die Option „enableDataBoost“ aktiviert die Spanner-Funktion Data Boost, mit der Auswirkungen auf die Spanner-Hauptinstanz nahezu null.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Code im Cluster ausführen
    1. SSH verwenden, um eine Verbindung zum Masterknoten des Dataproc-Clusters herzustellen
      1. Gehen Sie zur Dataproc-Cluster Seite in der Google Cloud Console und klicken Sie auf den Namen Ihres Clusters
        Seite „Cluster“ in der Cloud Console.
      2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
        Wählen Sie in der Cloud Console auf der Seite „Clusterdetails“ die Zeile „SSH für Clustername“ aus.

        Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem primären Knoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie singers.py mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.
      nano singers.py
      
    3. Führen Sie singers.py mit spark-submit aus, um den Spanner zu erstellen Tabelle Singers.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      Die Ausgabe sieht so aus:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Bereinigen

Führen Sie die folgenden Schritte aus, um die in dieser Schritt-für-Schritt-Anleitung erstellten Ressourcen zu bereinigen und so zu vermeiden, dass Ihrem Google Cloud-Konto fortlaufende Gebühren in Rechnung gestellt werden.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Weitere Informationen