Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster erstellen, in dem mithilfe des Spark Spanner-Connectors Daten mit Apache Spark aus Spanner gelesen werden.
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Hinweis
Bevor Sie den Spanner-Connector in dieser Anleitung verwenden, müssen Sie einen Dataproc-Cluster sowie eine Spanner-Instanz und ‑Datenbank einrichten.
Dataproc-Cluster einrichten
Erstellen Sie einen Dataproc-Cluster oder verwenden Sie einen vorhandenen Dataproc-Cluster mit den folgenden Einstellungen:
Berechtigungen für das VM-Dienstkonto. Dem VM-Dienstkonto des Clusters müssen die entsprechenden Spanner-Berechtigungen zugewiesen werden. Wenn Sie Data Boost verwenden (Data Boost ist im Beispielcode unter Daten aus Spanner lesen aktiviert), muss das VM-Dienstkonto auch die erforderlichen IAM-Berechtigungen für Data Boost haben.
Zugriffsbereich Der Cluster muss mit aktiviertem
cloud-platform
- oder dem entsprechendenspanner
-Bereich erstellt werden. Der Bereichcloud-platform
ist standardmäßig für Cluster aktiviert, die mit Image-Version 2.1 oder höher erstellt wurden.In der folgenden Anleitung erfahren Sie, wie Sie den
cloud-platform
-Umfang als Teil einer Clustererstellungsanfrage mit der Google Cloud Console, der gcloud CLI oder der Dataproc API festlegen. Eine weitere Anleitung zum Erstellen von Clustern finden Sie unter Cluster erstellen.- Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen.
- Klicken Sie auf dem Steuerfeld Sicherheit verwalten im Bereich Projektzugriff auf „Aktiviert den Bereich ‚cloud-platform‘ für diesen Cluster“.
- Füllen Sie die anderen Felder für die Clustererstellung aus oder bestätigen Sie sie. Klicken Sie dann auf Erstellen.
Mit dem folgenden
gcloud dataproc clusters create
-Befehl können Sie einen Cluster mit aktiviertemcloud-platform
-Bereich erstellen.gcloud dataproc clusters create
CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platformSie können GceClusterConfig.serviceAccountScopes als Teil einer clusters.create-Anfrage angeben.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Spanner-Instanz mit einer Singers
-Datenbanktabelle einrichten
Erstellen Sie eine Spanner-Instanz mit einer Datenbank, die eine Singers
-Tabelle enthält. Notieren Sie sich die Spanner-Instanz-ID und die Datenbank-ID.
Spanner-Connector mit Spark verwenden
Der Spanner-Connector ist für Spark-Versionen 3.1+
verfügbar.
Sie geben die Connectorversion als Teil der JAR-Dateibeschreibung des Cloud Storage-Connectors an, wenn Sie einen Job an einen Dataproc-Cluster senden.
Beispiel:Einreichen eines Spark-Jobs über die gcloud CLI mit dem Spanner-Connector
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION .jar \ ... [other job submission flags]
Ersetzen Sie Folgendes:
CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector
aus.
Daten aus Spanner lesen
Sie können mit Python oder Scala Daten mithilfe der Spark-Datenquellen-API aus Spanner in einen Spark-DataFrame lesen.
Sie können den Beispiel-PySpark-Code in diesem Abschnitt in Ihrem Cluster ausführen, indem Sie den Job an den Dataproc-Dienst senden oder ihn über die spark-submit
-REPL auf dem Clustermasterknoten ausführen.
- Erstellen Sie eine
singers.py
-Datei mit einem lokalen Texteditor oder in Cloud Shell mit dem vorinstallierten Texteditorvi
,vim
odernano
. - Fügen Sie den folgenden Code in die Datei
singers.py
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "
PROJECT_ID ") \ .option("instanceId", "INSTANCE_ID ") \ .option("databaseId", "DATABASE_ID ") \ .option("table", "TABLE_NAME ") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.py
. - Senden Sie den Job über die Google Cloud Console, die gcloud CLI oder die Dataproc API an den Dataproc-Dienst.
Beispiel:Job mit der gcloud CLI über den Spanner-Connector einreichen
gcloud dataproc jobs submit pyspark singers.py \ --cluster=
CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Name des neuen Clusters.
- REGION: Eine verfügbare Compute Engine-Region zum Ausführen der Arbeitslast.
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus.
- Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
- Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie mit dem vorinstallierten Texteditor
vi
,vim
odernano
einesingers.py
-Datei auf dem Masterknoten.- Fügen Sie den folgenden Code in die Datei
singers.py
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "
PROJECT_ID ") \ .option("instanceId", "INSTANCE_ID ") \ .option("databaseId", "DATABASE_ID ") \ .option("table", "TABLE_NAME ") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.py
.
- Fügen Sie den folgenden Code in die Datei
- Führen Sie
singers.py
mitspark-submit
aus, um die Spanner-TabelleSingers
zu erstellen.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-
CONNECTOR_VERSION .jar singers.pyErsetzen Sie Folgendes:
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus.
Die Ausgabe sieht so aus:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
So führen Sie den Beispiel-Scala-Code auf Ihrem Cluster aus:
- Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
- Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie mit dem vorinstallierten Texteditor
vi
,vim
odernano
einesingers.scala
-Datei auf dem Masterknoten.- Fügen Sie den folgenden Code in die Datei
singers.scala
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "
PROJECT_ID ") .option("instanceId", "INSTANCE_ID ") .option("databaseId", "DATABASE_ID ") .option("table", "TABLE_NAME ") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.scala
.
- Fügen Sie den folgenden Code in die Datei
- Starten Sie die
spark-shell
-REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-
CONNECTOR_VERSION .jarErsetzen Sie Folgendes:
CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus. - Führen Sie
singers.scala
mit dem Befehl:load singers.scala
aus, um die Spanner-Singers
-Tabelle zu erstellen. Die Ausgabeliste enthält Beispiele aus der Ausgabe von „Sänger“.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Bereinigen
Um laufende Kosten für Ihr Google Cloud Konto zu vermeiden, können Sie Ihren Dataproc-Cluster anhalten oder löschen und Ihre Spanner-Instanz löschen.