Questa pagina mostra come utilizzare il connettore Spark Spanner per leggere i dati di Spanner utilizzando Apache Spark
Calcolo dei costi
In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.
Prima di iniziare
Prima di eseguire il tutorial, accertati di conoscere la versione del connettore e di ottenere l'URI del connettore.
Come specificare l'URI del file JAR del connettore
Le versioni del connettore Spark Spanner sono elencate nel repository GoogleCloudDataproc/spark-spanner-connector.
Specifica il file JAR del connettore sostituendo le informazioni sulla versione del connettore nella seguente stringa URI:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Il connettore è disponibile per le versioni Spark 3.1+
Esempio della gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Preparazione del database Spanner
Se non hai una tabella Spanner, puoi seguire il tutorial per creare una tabella Spanner. Successivamente, avrai un ID istanza, un ID database e una tabella Singers
.
Crea cluster Dataproc
Qualsiasi cluster Dataproc che utilizza il connettore richiede gli ambiti spanner
o cloud-platform
. I cluster Dataproc hanno l'ambito predefinito cloud-platform
per l'immagine 2.1 o superiore. Se utilizzi una versione precedente, puoi usare la console Google Cloud, Google Cloud CLI e l'API Dataproc per creare un cluster Dataproc.
Console
- Nella console Google Cloud, apri la pagina di Dataproc Crea un cluster
- Nella scheda "Gestisci sicurezza", fai clic su "Abilita l'ambito cloud-platform per questo cluster" nella sezione "Accesso al progetto".
- Completa la compilazione o la conferma degli altri campi per la creazione del cluster, quindi fai clic su "Crea".
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puoi specificare GceClusterConfig.serviceAccountScopes come parte di una richiesta clusters.create. Ad esempio:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Dovrai assicurarti che l'autorizzazione Spanner corrispondente sia assegnata all'account di servizio VM Dataproc. Se utilizzi Data Boost nel tutorial, fai riferimento all'autorizzazione IAM di Data Boost.
Lettura dati da Spanner
Puoi utilizzare Scala e Python per leggere i dati da Spanner in un dataframe Spark utilizzando l'API per origine dati Spark.
Scala
- Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione EnableDataBoost abilita la funzionalità Data Boost di Spanner, che ha un impatto quasi pari a zero sull'istanza principale di Spanner.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Esegui il codice sul tuo cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
- Nella pagina >Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster
Si apre una finestra del browser nella tua directory home sul nodo masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.scala
con l'editor di testovi
,vim
onano
preinstallato, poi incolla il codice Scala dall'elenco dei codici Scalanano singers.scala
- Avvia il REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Esegui singers.scala con il comando
:load singers.scala
per creare la tabellaSingers
di Spanner. L'elenco degli output mostra esempi dell'output Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Esamina il codice e sostituisci i segnaposto [projectId], [instanceId], [databaseId] e [table] con l'ID progetto, l'ID istanza, l'ID database e la tabella che hai creato in precedenza. L'opzione EnableDataBoost abilita la funzionalità Data Boost di Spanner, che ha un impatto quasi pari a zero sull'istanza principale di Spanner.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Esegui il codice sul tuo cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina Cluster di Dataproc nella console Google Cloud, quindi fai clic sul nome del cluster
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster
Si apre una finestra del browser nella tua directory home sul nodo principaleConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.py
con l'editor di testovi
,vim
onano
preinstallato, poi incolla il codice PySpark dall'elenco di codici PySparknano singers.py
- Esegui singers.py con
spark-submit
per creare la tabellaSingers
di Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
L'output è:... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
esegui la pulizia
Per eseguire la pulizia ed evitare che al tuo account Google Cloud vengano addebitati costi per le risorse create in questa procedura dettagliata, segui questi passaggi.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME