Cette page explique comment créer un cluster Dataproc qui utilise le connecteur Spark Spanner pour lire des données depuis Spanner à l'aide d'Apache Spark.
Le connecteur Spanner fonctionne avec Spark pour lire les données de la base de données Spanner à l'aide de la bibliothèque Java Spanner. Le connecteur Spanner permet de lire les tables et les graphiques Spanner dans les DataFrames et les GraphFrames Spark.
Coûts
Dans ce document, vous utilisez les composants facturables de Google Cloudsuivants :
- Dataproc
- Spanner
- Cloud Storage
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Avant de commencer
Avant d'utiliser le connecteur Spanner dans ce tutoriel, configurez un cluster Dataproc ainsi qu'une instance et une base de données Spanner.
Configurer un cluster Dataproc
Créez un cluster Dataproc ou utilisez un cluster Dataproc existant avec les paramètres suivants :
Autorisations du compte de service de VM. Le compte de service de VM du cluster doit disposer des autorisations Spanner appropriées. Si vous utilisez Data Boost (Data Boost est activé dans l'exemple de code de Exporter des tables Spanner), le compte de service de la VM doit également disposer des autorisations IAM Data Boost requises.
Niveau d'accès. Le cluster doit être créé avec le champ d'application
cloud-platform
ou le champ d'applicationspanner
approprié activé. Le champ d'applicationcloud-platform
est activé par défaut pour les clusters créés avec la version 2.1 de l'image ou une version ultérieure.Les instructions suivantes vous expliquent comment définir le champ d'application
cloud-platform
dans une requête de création de cluster qui utilise la console Google Cloud , gcloud CLI ou l'API Dataproc. Pour obtenir d'autres instructions sur la création de clusters, consultez Créer un cluster.ConsoleGoogle Cloud
- Dans la console Google Cloud , ouvrez la page Dataproc Créer un cluster.
- Dans le panneau Gérer la sécurité de la section Accès au projet, cliquez sur "Active le champ d'application Cloud Platform pour ce cluster".
- Renseignez ou confirmez les autres champs de création de cluster, puis cliquez sur Créer.
gcloud CLI
Vous pouvez exécuter la commande
gcloud dataproc clusters create
suivante pour créer un cluster avec le champ d'applicationcloud-platform
activé.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Vous pouvez spécifier GceClusterConfig.serviceAccountScopes dans le cadre d'une requête clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configurer une instance Spanner avec une table de base de données "Singers"
Créez une instance Spanner avec une base de données contenant une table Singers
. Notez l'ID de l'instance Spanner et l'ID de la base de données.
Utiliser le connecteur Spanner avec Spark
Le connecteur Spanner est disponible pour les versions Spark 3.1+
.
Vous spécifiez la version du connecteur dans la spécification du fichier JAR du connecteur Cloud Storage lorsque vous envoyez un job à un cluster Dataproc.
Exemple : Envoi d'un job Spark gcloud CLI avec le connecteur Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Remplacez les éléments suivants :
CONNECTOR_VERSION : version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector
.
Lire des tables Spanner
Vous pouvez utiliser Python ou Scala pour lire les données de table Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.
PySpark
Vous pouvez exécuter l'exemple de code PySpark de cette section sur votre cluster en envoyant le job au service Dataproc ou en exécutant le job à partir du REPL spark-submit
sur le nœud maître du cluster.
Job Dataproc
- Créez un fichier
singers.py
à l'aide d'un éditeur de texte local ou dans Cloud Shell à l'aide de l'éditeur de texte préinstallévi
,vim
ounano
. - Après avoir renseigné les variables d'espace réservé, collez le code suivant dans le fichier
singers.py
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Remplacez les éléments suivants :
- PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.py
. - Envoyez le job au service Dataproc à l'aide de la console Google Cloud , de gcloud CLI ou de l'API Dataproc.
Exemple : Envoi d'un job gcloud CLI avec le connecteur Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Remplacez les éléments suivants :
- CLUSTER_NAME : nom du nouveau cluster.
- REGION : Région Compute Engine disponible pour exécuter la charge de travail.
- CONNECTOR_VERSION : version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Tâche spark-submit
- Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
- Accédez à la page Clusters de Dataproc dans la console Google Cloud , puis cliquez sur le nom de votre cluster.
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster.Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez un fichier
singers.py
sur le nœud maître à l'aide de l'éditeur de texte préinstallévi
,vim
ounano
.- Collez le code suivant dans le fichier
singers.py
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Remplacez les éléments suivants :
- PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.py
.
- Collez le code suivant dans le fichier
- Exécutez
singers.py
avecspark-submit
pour créer la table SpannerSingers
.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Remplacez les éléments suivants :
- CONNECTOR_VERSION : version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Voici le résultat :
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION : version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
Scala
Pour exécuter l'exemple de code Scala sur votre cluster, procédez comme suit :
- Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
- Accédez à la page Clusters de Dataproc dans la console Google Cloud , puis cliquez sur le nom de votre cluster.
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster.Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez un fichier
singers.scala
sur le nœud maître à l'aide de l'éditeur de texte préinstallévi
,vim
ounano
.- Collez le code suivant dans le fichier
singers.scala
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasiment nul sur l'instance Spanner principale.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Remplacez les éléments suivants :
- PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.scala
.
- Collez le code suivant dans le fichier
- Lancez le REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Remplacez les éléments suivants :
CONNECTOR_VERSION : version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Exécutez
singers.scala
avec la commande:load singers.scala
pour créer la table SpannerSingers
. La liste de sortie affiche des exemples de la sortie "Singers".> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Lire les graphiques Spanner
Le connecteur Spanner permet d'exporter le graphique dans des DataFrames de nœuds et d'arêtes distincts, ainsi que directement dans GraphFrames
.
L'exemple suivant exporte une base de données Spanner dans un GraphFrame
.
Il utilise la classe Python SpannerGraphConnector
, incluse dans le fichier JAR du connecteur Spanner, pour lire Spanner Graph.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Remplacez les éléments suivants :
- CONNECTOR_VERSION : version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
. - PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : insérez les ID de l'instance, de la base de données et du graphique.
Pour exporter les DataFrames
de nœuds et d'arêtes au lieu de GraphFrames, utilisez plutôt load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Effectuer un nettoyage
Pour éviter que des frais ne continuent d'être facturés sur votre compte Google Cloud , vous pouvez arrêter ou supprimer votre cluster Dataproc, et supprimer votre instance Spanner.
Étapes suivantes
- Consultez les exemples
pyspark.sql.DataFrame
. - Pour en savoir plus sur les langues acceptées par les DataFrames Spark, consultez les ressources suivantes :
- Consultez le dépôt Spark Spanner Connector sur GitHub.
- Consultez les conseils pour régler des tâches Spark.