Spanner-Connector mit Spark verwenden

Auf dieser Seite wird beschrieben, wie Sie mit dem Spark Spanner-Connector Daten aus Spanner mit Apache Spark lesen können.

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

Bevor Sie die Anleitung ausführen, sollten Sie die Connector-Version kennen und einen Connector-URI abrufen.

So geben Sie den URI der Connector-JAR-Datei an

Versionen des Spark Spanner-Connectors sind im GoogleCloud Dataproc/spark-spanner-connector-Repository von GitHub aufgeführt.

Geben Sie die Connector-JAR-Datei an. Ersetzen Sie dazu im folgenden URI-String die Informationen zur Connector-Version:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

Der Connector ist für die Spark-Versionen 3.1+ verfügbar

Beispiel für die gcloud CLI:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Spanner-Datenbank vorbereiten

Wenn Sie keine Spanner-Tabelle haben, folgen Sie der Anleitung, um eine Spanner-Tabelle zu erstellen. Danach haben Sie eine Instanz-ID, eine Datenbank-ID und eine Tabelle Singers.

Dataproc-Cluster erstellen

Jeder Dataproc-Cluster, der den Connector verwendet, benötigt den Bereich spanner oder cloud-platform. Dataproc-Cluster haben den Standardbereich cloud-platform für Image 2.1 oder höher. Wenn Sie eine ältere Version verwenden, können Sie mit der Google Cloud Console, der Google Cloud CLI und der Dataproc API einen Dataproc-Cluster erstellen.

Console

  1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen auf.
  2. Klicken Sie auf dem Tab „Sicherheit verwalten“ im Bereich „Projektzugriff“ auf „Aktiviert den Bereich der Cloud-Plattform für diesen Cluster“.
  3. Füllen Sie die anderen Felder für die Clustererstellung aus oder bestätigen Sie sie und klicken Sie dann auf „Erstellen“.

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

Sie können GceClusterConfig.serviceAccountScopes im Rahmen einer clusters.create-Anfrage angeben. Beispiel:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Achten Sie darauf, dass dem Dataproc-VM-Dienstkonto die entsprechende Spanner-Berechtigung zugewiesen ist. Wenn Sie Data Boost in der Anleitung verwenden, lesen Sie die IAM-Berechtigung für Data Boost.

Daten aus Spanner lesen

Sie können Scala und Python verwenden, um Daten mit der Spark Data Source API aus Spanner in einen Spark Dataframe zu lesen.

Scala

  1. Sehen Sie sich den Code an und ersetzen Sie den Platzhalter [projectId], [instanceId], [databaseId] und [table] durch die Projekt-ID, Instanz-ID, Datenbank-ID und die Tabelle, die Sie zuvor erstellt haben. Die Option „enableDataBoost“ aktiviert das Spanner-Feature Data Boost, das sich nahezu auf null auf die Spanner-Hauptinstanz auswirkt.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
    
    
  2. Code in einem Cluster ausführen
    1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
      1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
        Seite „Dataproc-Cluster“ in der Cloud Console
      2. Wählen Sie auf der Seite >Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Masterknotens des Clusters auf SSH
        Detailseite des Dataproc-Clusters in der Cloud Console

        Ein Browserfenster in Ihrem Basisverzeichnis wird auf dem Masterknoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie singers.scala mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den Scala-Code aus der Scala-Codeliste ein
      nano singers.scala
        
    3. Starten Sie die spark-shell-REPL.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Führen Sie singers.scala mit dem Befehl :load singers.scala aus, um die Spanner-Tabelle Singers zu erstellen. In der Ausgabeliste werden Beispiele aus der „Sänger“-Ausgabe angezeigt.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Sehen Sie sich den Code an und ersetzen Sie den Platzhalter [projectId], [instanceId], [databaseId] und [table] durch die Projekt-ID, Instanz-ID, Datenbank-ID und die Tabelle, die Sie zuvor erstellt haben. Die Option „enableDataBoost“ aktiviert das Spanner-Feature Data Boost, das sich nahezu auf null auf die Spanner-Hauptinstanz auswirkt.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
    
    
  2. Code im Cluster ausführen
    1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
      1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
        Seite „Cluster“ in der Cloud Console.
      2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Masterknotens des Clusters auf SSH
        Wählen Sie in der Cloud Console auf der Seite „Clusterdetails“ die Option „SSH“ in der Zeile mit dem Clusternamen aus.

        Ein Browserfenster für Ihr Basisverzeichnis auf dem primären Knoten wird geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie singers.py mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.
      nano singers.py
      
    3. Führen Sie singers.py mit spark-submit aus, um die Spanner-Tabelle Singers zu erstellen.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      Die Ausgabe sieht so aus:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Bereinigen

Führen Sie die folgenden Schritte aus, um eine Bereinigung durchzuführen und zu vermeiden, dass Ihrem Google Cloud-Konto laufende Gebühren für die in dieser Schritt-für-Schritt-Anleitung erstellten Ressourcen berechnet werden.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Weitere Informationen