Auf dieser Seite erfahren Sie, wie Sie einen Dataproc-Cluster erstellen, in dem mithilfe des Spark Spanner-Connectors Daten mit Apache Spark aus Spanner gelesen werden.
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Hinweis
Bevor Sie den Spanner-Connector in dieser Anleitung verwenden, müssen Sie einen Dataproc-Cluster sowie eine Spanner-Instanz und ‑Datenbank einrichten.
Dataproc-Cluster einrichten
Erstellen Sie einen Dataproc-Cluster oder verwenden Sie einen vorhandenen Dataproc-Cluster mit den folgenden Einstellungen:
Berechtigungen für das VM-Dienstkonto. Dem VM-Dienstkonto des Clusters müssen die entsprechenden Spanner-Berechtigungen zugewiesen werden. Wenn Sie Data Boost verwenden (Data Boost ist im Beispielcode unter Daten aus Spanner lesen aktiviert), muss das VM-Dienstkonto auch die erforderlichen IAM-Berechtigungen für Data Boost haben.
Zugriffsbereich Der Cluster muss mit aktiviertem
cloud-platform
- oder dem entsprechendenspanner
-Bereich erstellt werden. Der Bereichcloud-platform
ist standardmäßig für Cluster aktiviert, die mit Image-Version 2.1 oder höher erstellt wurden.In der folgenden Anleitung erfahren Sie, wie Sie den
cloud-platform
-Umfang als Teil einer Clustererstellungsanfrage mit der Google Cloud Console, der gcloud CLI oder der Dataproc API festlegen. Eine weitere Anleitung zum Erstellen von Clustern finden Sie unter Cluster erstellen.Google Cloud Console
- Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen.
- Klicken Sie auf dem Steuerfeld Sicherheit verwalten im Bereich Projektzugriff auf „Aktiviert den Bereich ‚cloud-platform‘ für diesen Cluster“.
- Füllen Sie die anderen Felder für die Clustererstellung aus oder bestätigen Sie sie. Klicken Sie dann auf Erstellen.
gcloud-CLI
Mit dem folgenden
gcloud dataproc clusters create
-Befehl können Sie einen Cluster mit aktiviertemcloud-platform
-Bereich erstellen.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Sie können GceClusterConfig.serviceAccountScopes als Teil einer clusters.create-Anfrage angeben.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Spanner-Instanz mit einer Singers
-Datenbanktabelle einrichten
Erstellen Sie eine Spanner-Instanz mit einer Datenbank, die eine Singers
-Tabelle enthält. Notieren Sie sich die Spanner-Instanz-ID und die Datenbank-ID.
Spanner-Connector mit Spark verwenden
Der Spanner-Connector ist für Spark-Versionen 3.1+
verfügbar.
Sie geben die Connectorversion als Teil der JAR-Dateibeschreibung des Cloud Storage-Connectors an, wenn Sie einen Job an einen Dataproc-Cluster senden.
Beispiel:Einreichen eines Spark-Jobs über die gcloud CLI mit dem Spanner-Connector
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Ersetzen Sie Folgendes:
CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector
aus.
Daten aus Spanner lesen
Sie können mit Python oder Scala Daten mithilfe der Spark-Datenquellen-API aus Spanner in einen Spark-DataFrame lesen.
PySpark
Sie können den Beispiel-PySpark-Code in diesem Abschnitt in Ihrem Cluster ausführen, indem Sie den Job an den Dataproc-Dienst senden oder ihn über die spark-submit
-REPL auf dem Clustermasterknoten ausführen.
Dataproc-Job
- Erstellen Sie eine
singers.py
-Datei mit einem lokalen Texteditor oder in Cloud Shell mit dem vorinstallierten Texteditorvi
,vim
odernano
. - Fügen Sie den folgenden Code in die Datei
singers.py
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.py
. - Senden Sie den Job über die Google Cloud Console, die gcloud CLI oder die Dataproc API an den Dataproc-Dienst.
Beispiel:Job mit der gcloud CLI über den Spanner-Connector einreichen
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Ersetzen Sie Folgendes:
- CLUSTER_NAME: Der Name des neuen Clusters.
- REGION: Eine verfügbare Compute Engine-Region zum Ausführen der Arbeitslast.
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus.
spark-submit-Job
- Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
- Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie mit dem vorinstallierten Texteditor
vi
,vim
odernano
einesingers.py
-Datei auf dem Masterknoten.- Fügen Sie den folgenden Code in die Datei
singers.py
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.py
.
- Fügen Sie den folgenden Code in die Datei
- Führen Sie
singers.py
mitspark-submit
aus, um die Spanner-TabelleSingers
zu erstellen.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Ersetzen Sie Folgendes:
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus.
Die Ausgabe sieht so aus:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Version des Spanner-Connectors.
Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
Scala
So führen Sie den Beispiel-Scala-Code auf Ihrem Cluster aus:
- Stellen Sie mit SSH eine Verbindung zum Masterknoten des Dataproc-Clusters her.
- Rufen Sie in der Google Cloud Console die Dataproc-Seite Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.Im Stammverzeichnis des Masterknotens wird ein Browserfenster geöffnet.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie mit dem vorinstallierten Texteditor
vi
,vim
odernano
einesingers.scala
-Datei auf dem Masterknoten.- Fügen Sie den folgenden Code in die Datei
singers.scala
ein. Die Spanner-Funktion Data Boost ist aktiviert. Dies hat nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Google Cloud Projekt-ID. Projekt-IDs werden im Bereich Projektinformationen im Dashboard der Google Cloud Console aufgeführt.
- INSTANCE_ID, DATABASE_ID und TABLE_NAME : Weitere Informationen finden Sie unter Spanner-Instanz mit
Singers
-Datenbanktabelle einrichten.
- Speichern Sie die Datei
singers.scala
.
- Fügen Sie den folgenden Code in die Datei
- Starten Sie die
spark-shell
-REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Ersetzen Sie Folgendes:
CONNECTOR_VERSION: Version des Spanner-Connectors. Wählen Sie die Spanner-Connector-Version aus der Versionsliste im GitHub-Repository
GoogleCloudDataproc/spark-spanner-connector
aus. - Führen Sie
singers.scala
mit dem Befehl:load singers.scala
aus, um die Spanner-Singers
-Tabelle zu erstellen. Die Ausgabeliste enthält Beispiele aus der Ausgabe von „Sänger“.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Bereinigen
Um laufende Kosten für Ihr Google Cloud Konto zu vermeiden, können Sie Ihren Dataproc-Cluster anhalten oder löschen und Ihre Spanner-Instanz löschen.