Spanner-Connector mit Spark verwenden

Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster erstellen, in dem mithilfe des Spark Spanner-Connectors Daten mit Apache Spark aus Spanner gelesen werden.

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

Bevor Sie den Spanner-Connector in dieser Anleitung verwenden, müssen Sie einen Dataproc-Cluster sowie eine Spanner-Instanz und ‑Datenbank einrichten.

Dataproc-Cluster einrichten

Erstellen Sie einen Dataproc-Cluster oder verwenden Sie einen vorhandenen Dataproc-Cluster mit den folgenden Einstellungen:

Spanner-Instanz mit einer Singers-Datenbanktabelle einrichten

Erstellen Sie eine Spanner-Instanz mit einer Datenbank, die eine Singers-Tabelle enthält. Notieren Sie sich die Spanner-Instanz-ID und die Datenbank-ID.

Spanner-Connector mit Spark verwenden

Der Spanner-Connector ist für Spark-Versionen 3.1+ verfügbar. Sie geben die Connectorversion als Teil der JAR-Dateibeschreibung des Cloud Storage-Connectors an, wenn Sie einen Job an einen Dataproc-Cluster senden.

Beispiel:Einreichen eines Spark-Jobs über die gcloud CLI mit dem Spanner-Connector

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Ersetzen Sie Folgendes:

CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

Daten aus Spanner lesen

Sie können mit Python oder Scala Daten mithilfe der Spark-Datenquellen-API aus Spanner in einen Spark-DataFrame lesen.

Sie können den Beispiel-PySpark-Code in diesem Abschnitt in Ihrem Cluster ausführen, indem Sie den Job an den Dataproc-Dienst senden oder ihn über die spark-submit-REPL auf dem Clustermasterknoten ausführen.

  1. Erstellen Sie eine singers.py-Datei mit einem lokalen Texteditor oder in Cloud Shell mit dem vorinstallierten Texteditor vi, vim oder nano.
    1. Fügen Sie den folgenden Code in die Datei singers.py ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.py.
  2. Senden Sie den Job über die Google Cloud Console, die gcloud CLI oder die Dataproc API an den Dataproc-Dienst.

    Beispiel:Job mit der gcloud CLI über den Spanner-Connector einreichen

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Ersetzen Sie Folgendes:

    1. CLUSTER_NAME: Der Name des neuen Clusters.
    2. REGION: Eine verfügbare Compute Engine-Region zum Ausführen der Arbeitslast.
    3. CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.
  1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
    1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
    2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
      Seite mit den Details zum Dataproc-Cluster in der Cloud Console

      Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Erstellen Sie mit dem vorinstallierten Texteditor vi, vim oder nano eine singers.py-Datei auf dem Masterknoten.
    1. Fügen Sie den folgenden Code in die Datei singers.py ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.py.
  3. Führen Sie singers.py mit spark-submit aus, um die Spanner-Tabelle Singers zu erstellen.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Ersetzen Sie Folgendes:

    1. CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

    Die Ausgabe sieht so aus:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

So führen Sie den Beispiel-Scala-Code auf Ihrem Cluster aus:

  1. Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
    1. Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
    2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
      Seite mit den Details zum Dataproc-Cluster in der Cloud Console

      Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Erstellen Sie mit dem vorinstallierten Texteditor vi, vim oder nano eine singers.scala-Datei auf dem Masterknoten.
    1. Fügen Sie den folgenden Code in die Datei singers.scala ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Ersetzen Sie Folgendes:

      1. PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
      2. INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit Singers-Datenbanktabelle einrichten.
    2. Speichern Sie die Datei singers.scala.
  3. Starten Sie die spark-shell-REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Ersetzen Sie Folgendes:

    CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aus.

  4. Führen Sie singers.scala mit dem Befehl :load singers.scala aus, um die Spanner-Singers-Tabelle zu erstellen. Die Ausgabeliste enthält Beispiele aus der Ausgabe von „Sänger“.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Bereinigen

Um laufende Kosten für Ihr Google Cloud Konto zu vermeiden, können Sie Ihren Dataproc-Cluster anhalten oder löschen und Ihre Spanner-Instanz löschen.

Weitere Informationen