Questa pagina mostra come creare un cluster Dataproc che utilizza il connettore Spark Spanner per leggere i dati da Spanner utilizzando Apache Spark.
Calcolare i costi
In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il Calcolatore prezzi.
Prima di iniziare
Prima di utilizzare il connettore Spanner in questo tutorial, configura un cluster Dataproc, un'istanza e un database Spanner.
Configura un cluster Dataproc
Crea un cluster Dataproc o utilizza un cluster Dataproc esistente con le seguenti impostazioni:
Autorizzazioni dell'account di servizio della VM. All'account di servizio VM del cluster devono essere assegnate le autorizzazioni Spanner appropriate. Se utilizzi Data Boost (Data Boost è abilitato nel codice di esempio in Leggi i dati da Spanner), l'account di servizio della VM deve disporre anche delle autorizzazioni IAM di Data Boost richieste.
Ambito di accesso. Il cluster deve essere creato con l'ambito
cloud-platform
o l'ambitospanner
appropriato abilitato. L'ambitocloud-platform
è attivato per impostazione predefinita per i cluster creati con la versione dell'immagine 2.1 o successiva.Le istruzioni riportate di seguito spiegano come impostare l'ambito
cloud-platform
nell'ambito di una richiesta di creazione di un cluster che utilizza la console, l'interfaccia a riga di comando gcloud o l'API Dataproc. Google Cloud Per ulteriori istruzioni sulla creazione di cluster, consulta Creare un cluster.Google Cloud console
- Nella Google Cloud console, apri la pagina Dataproc Crea un cluster.
- Nel pannello Gestisci la sicurezza della sezione Accesso al progetto, fai clic su Attiva l'ambito cloud-platform per questo cluster.
- Compila o conferma gli altri campi per la creazione del cluster, quindi fai clic su Crea.
Interfaccia a riga di comando gcloud
Puoi eseguire il seguente comando
gcloud dataproc clusters create
per creare un cluster con l'ambitocloud-platform
abilitato.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puoi specificare GceClusterConfig.serviceAccountScopes nell'ambito di una richiesta clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configura un'istanza Spanner con una tabella di database Singers
Crea un'istanza Spanner con un database che contiene una tabella Singers
. Prendi nota dell'ID istanza e dell'ID database Spanner.
Utilizzare il connettore Spanner con Spark
Il connettore Spanner è disponibile per le versioni di Spark 3.1+
.
Specifica la
versione del connettore come parte della specifica del file JAR del connettore Cloud Storage quando
invii un job a
un cluster Dataproc.
Esempio: invio di un job Spark con l'interfaccia a riga di comando gcloud e il connettore Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Leggere i dati da Spanner
Puoi utilizzare Python o Scala per leggere i dati da Spanner in un DataFrame Spark utilizzando l'API di origine dati Spark.
PySpark
Puoi eseguire il codice PySpark di esempio in questa sezione sul tuo cluster inviando il job al servizio Dataproc o eseguendolo dalla REPL spark-submit
sul nodo principale del cluster.
Job Dataproc
- Crea un file
singers.py
utilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testovi
,vim
onano
preinstallato. - Incolla il seguente codice nel file
singers.py
. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configurare un'istanza Spanner con la tabella di database
Singers
.
- Salva il file
singers.py
. - Invia il job
al servizio Dataproc utilizzando la Google Cloud console, l'interfaccia a riga di comando gcloud o
l'API Dataproc.
Esempio: invio di un job con l'interfaccia a riga di comando gcloud con il connettore Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Sostituisci quanto segue:
- CLUSTER_NAME: il nome del nuovo cluster.
- REGION: una regione Compute Engine disponibile per eseguire il carico di lavoro.
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Job spark-submit
- Connettiti al nodo principale del cluster Dataproc tramite SSH.
- Vai alla pagina Dataproc Cluster nella Google Cloud console, quindi fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo principale del cluster.Viene visualizzata una finestra del browser nella home directory sul nodo principale.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.py
sul nodo principale utilizzando l'editor di testovi
,vim
onano
preinstallato.- Incolla il seguente codice nel file
singers.py
. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configurare un'istanza Spanner con la tabella di database
Singers
.
- Salva il file
singers.py
.
- Incolla il seguente codice nel file
- Esegui
singers.py
conspark-submit
per creare la tabella SpannerSingers
.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Sostituisci quanto segue:
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
L'output è:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
Scala
Per eseguire il codice Scala di esempio sul cluster:
- Connettiti al nodo principale del cluster Dataproc tramite SSH.
- Vai alla pagina Dataproc Cluster nella Google Cloud console e fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo principale del cluster.Viene visualizzata una finestra del browser nella home directory sul nodo principale.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.scala
sul nodo principale utilizzando l'editor di testovi
,vim
onano
preinstallato.- Incolla il seguente codice nel file
singers.scala
. Tieni presente che la funzionalità Data Boost di Spanner è attivata, il che ha un impatto quasi nullo sull'istanza Spanner principale.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Sostituisci quanto segue:
- PROJECT_ID: il tuo Google Cloud ID progetto. Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud dashboard della console.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configurare un'istanza Spanner con la tabella di database
Singers
.
- Salva il file
singers.scala
.
- Incolla il seguente codice nel file
- Avvia la REPL di
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Esegui
singers.scala
con il comando:load singers.scala
per creare la tabella SpannerSingers
. L'elenco di output mostra esempi dell'output di Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Esegui la pulizia
Per evitare addebiti continui sul tuo Google Cloud account, puoi interrompere o eliminare il tuo cluster Dataproc e eliminare l'istanza Spanner.