Questa pagina mostra come creare un cluster Dataproc che utilizza il connettore Spark Spanner per leggere i dati da Spanner utilizzando Apache Spark.
Il connettore Spanner funziona con Spark per leggere i dati dal database Spanner utilizzando la libreria Java di Spanner. Il connettore Spanner supporta la lettura di tabelle e grafi di Spanner in DataFrames e GraphFrames di Spark.
Costi
In questo documento vengono utilizzati i seguenti componenti fatturabili di Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Prima di iniziare
Prima di utilizzare il connettore Spanner in questo tutorial, configura un cluster Dataproc e un'istanza e un database Spanner.
Configura un cluster Dataproc
Crea un cluster Dataproc o utilizza un cluster Dataproc esistente con le seguenti impostazioni:
Autorizzazioni del account di servizio VM. All'account di servizio VM del cluster devono essere assegnate le autorizzazioni Spanner appropriate. Se utilizzi Data Boost (Data Boost è abilitato nel codice di esempio in Esporta tabelle Spanner), anche il account di servizio VM deve disporre delle autorizzazioni IAM Data Boost richieste.
Ambito di accesso. Il cluster deve essere creato con l'ambito
cloud-platform
o l'ambitospanner
appropriato abilitato. L'ambitocloud-platform
è attivato per impostazione predefinita per i cluster creati con la versione 2.1 dell'immagine o successive.Le seguenti istruzioni mostrano come impostare l'ambito
cloud-platform
nell'ambito di una richiesta di creazione del cluster che utilizza la console Google Cloud , gcloud CLI o l'API Dataproc. Per ulteriori istruzioni sulla creazione di cluster, consulta Creare un cluster.ConsoleGoogle Cloud
- Nella console Google Cloud , apri la pagina Dataproc Crea un cluster.
- Nel pannello Gestisci sicurezza nella sezione Accesso al progetto, fai clic su "Abilita l'ambito cloud-platform per questo cluster".
- Compila o conferma gli altri campi di creazione del cluster, poi fai clic su Crea.
Interfaccia a riga di comando gcloud
Puoi eseguire il seguente comando
gcloud dataproc clusters create
per creare un cluster con l'ambitocloud-platform
abilitato.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puoi specificare GceClusterConfig.serviceAccountScopes nell'ambito di una richiesta clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configura un'istanza Spanner con una tabella del database Singers
Crea un'istanza Spanner
con un database che contiene una tabella Singers
. Prendi nota dell'ID istanza e dell'ID database Spanner.
Utilizzare il connettore Spanner con Spark
Il connettore Spanner è disponibile per le versioni di Spark 3.1+
.
Specifichi la
versione del connettore come parte della specifica del file JAR del connettore Cloud Storage quando invii un job a un cluster Dataproc.
Esempio:invio di job Spark gcloud CLI con il connettore Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Leggere le tabelle Spanner
Puoi utilizzare Python o Scala per leggere i dati delle tabelle Spanner in un DataFrame Spark utilizzando l'API Spark Data Source.
PySpark
Puoi eseguire il codice PySpark di esempio in questa sezione sul tuo cluster inviando il job al servizio Dataproc o eseguendolo dal REPL spark-submit
sul nodo master del cluster.
Job Dataproc
- Crea un file
singers.py
utilizzando un editor di testo locale o in Cloud Shell utilizzando l'editor di testovi
,vim
onano
preinstallato. - Dopo aver compilato le variabili segnaposto, incolla il seguente codice
nel file
singers.py
. Tieni presente che la funzionalità Data Boost di Spanner è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella del database
Singers
.
- Salva il file
singers.py
. - Invia il job
al servizio Dataproc utilizzando la console Google Cloud , gcloud CLI o
l'API Dataproc.
Esempio:invio di job gcloud CLI con il connettore Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Sostituisci quanto segue:
- CLUSTER_NAME: il nome del nuovo cluster.
- REGION: una regione di Compute Engine disponibile per eseguire il carico di lavoro.
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Job spark-submit
- Connettiti al nodo master del cluster Dataproc utilizzando SSH.
- Vai alla pagina Cluster di Dataproc nella console Google Cloud , quindi fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster.Si apre una finestra del browser nella directory home del nodo master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.py
sul nodo master utilizzando l'editor di testovi
,vim
onano
preinstallato.- Incolla il seguente codice nel file
singers.py
. Tieni presente che la funzionalità Data Boost di Spanner è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella del database
Singers
.
- Salva il file
singers.py
.
- Incolla il seguente codice nel file
- Esegui
singers.py
conspark-submit
per creare la tabella SpannerSingers
.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Sostituisci quanto segue:
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
.
L'output è:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
Scala
Per eseguire il codice Scala di esempio sul cluster, completa i seguenti passaggi:
- Connettiti al nodo master del cluster Dataproc utilizzando SSH.
- Vai alla pagina Cluster di Dataproc nella console Google Cloud , quindi fai clic sul nome del cluster.
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster.Si apre una finestra del browser nella directory home del nodo master.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea un file
singers.scala
sul nodo master utilizzando l'editor di testovi
,vim
onano
preinstallato.- Incolla il seguente codice nel file
singers.scala
. Tieni presente che la funzionalità Spanner Data Boost è abilitata e ha un impatto quasi nullo sull'istanza Spanner principale.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulta
Configura un'istanza Spanner con la tabella del database
Singers
.
- Salva il file
singers.scala
.
- Incolla il seguente codice nel file
- Avvia il REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Sostituisci quanto segue:
CONNECTOR_VERSION: versione del connettore Spanner. Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Esegui
singers.scala
con il comando:load singers.scala
per creare la tabella SpannerSingers
. L'elenco dell'output mostra esempi dell'output di Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Leggere i grafici di Spanner
Il connettore Spanner supporta l'esportazione del grafico in DataFrames di nodi e archi separati, nonché l'esportazione direttamente in GraphFrames
.
L'esempio seguente esporta un database Spanner in un GraphFrame
.
Utilizza la classe SpannerGraphConnector
Python,
inclusa nel file JAR del connettore Spanner, per leggere il
grafico Spanner.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Sostituisci quanto segue:
- CONNECTOR_VERSION: versione del connettore Spanner.
Scegli la versione del connettore Spanner dall'elenco delle versioni nel repository GitHub
GoogleCloudDataproc/spark-spanner-connector
. - PROJECT_ID: il tuo ID progetto Google Cloud . Gli ID progetto sono elencati nella sezione Informazioni sul progetto della Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME Inserisci l'ID istanza, database e grafico.
Per esportare nodi e archi DataFrames
anziché GraphFrames, utilizza
load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Esegui la pulizia
Per evitare addebiti continui al tuo account Google Cloud , puoi interrompere o eliminare il cluster Dataproc e eliminare l'istanza Spanner.
Passaggi successivi
- Fai riferimento agli
esempi di
pyspark.sql.DataFrame
. - Per il supporto della lingua Spark DataFrame, consulta quanto segue:
- Consulta il repository Spark Spanner Connector su GitHub.
- Consulta i suggerimenti per l'ottimizzazione del job Spark.