Cette page explique comment utiliser le connecteur Spark Spanner pour lire des données à partir de Spanner à l'aide d'Apache Spark.
Calculer les coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
- Dataproc
- Spanner
- Cloud Storage
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Avant de commencer
Avant d'exécuter le tutoriel, assurez-vous de connaître la version du connecteur et d'obtenir un URI de connecteur.
Spécifier l'URI du fichier JAR du connecteur
Les versions du connecteur Spark Spanner sont listées dans le fichier Dépôt GoogleCloudDataproc/spark-spanner-connector.
Spécifiez le fichier JAR du connecteur en remplaçant la version du connecteur
informations dans la chaîne d'URI suivante:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Le connecteur est disponible pour les versions de Spark 3.1+
Exemple de CLI gcloud :
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Préparer la base de données Spanner
Si vous n'avez pas de table Spanner, vous pouvez suivre les instructions
tutoriel pour créer
Table Spanner. Vous disposerez ensuite d'un ID d'instance, d'un ID de base de données et d'une table Singers
.
Créer un cluster Dataproc
Tout cluster Dataproc utilisant le connecteur doit disposer des niveaux d'accès spanner
ou cloud-platform
. Les clusters Dataproc ont le niveau d'accès par défaut cloud-platform
pour l'image 2.1 ou version ultérieure. Si vous utilisez une version plus ancienne, vous pouvez créer un cluster Dataproc à l'aide de la console Google Cloud, de Google Cloud CLI et de l'API Dataproc.
Console
- Dans la console Google Cloud, ouvrez Dataproc Page Créer un cluster
- Sur la page "Gérer la sécurité", cliquez sur "Active le champ d'application cloud-platform pour ce cluster" sous "Accès au projet", .
- Renseignez ou confirmez les autres champs de création de cluster, puis cliquez sur "Créer".
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Vous pouvez spécifier GceClusterConfig.serviceAccountScopes dans le cadre d'une requête clusters.create. Exemple :"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Vous devez vous assurer que l'autorisation Spanner correspondante est attribuée au compte de service de la VM Dataproc. Si vous utilisez Data Boost dans le tutoriel, reportez-vous à l'autorisation IAM Data Boost.
Lire les données de Spanner
Vous pouvez utiliser Scala et Python pour lire les données de Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.
Scala
- Examinez le code et remplacez les espaces réservés [projectId], [instanceId], [databaseId] et [table] par l'ID du projet, l'ID de l'instance, l'ID de la base de données et la table que vous avez créés précédemment. L'option enableDataBoost active la fonctionnalité Data Boost de Spanner, qui a un impact quasi nul sur l'instance Spanner principale.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Exécuter le code sur votre cluster
- Utiliser SSH pour se connecter au nœud maître du cluster Dataproc
- Accédez au Clusters Dataproc de la console Google Cloud, puis cliquez sur le nom de votre cluster
- Sur la page >Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster
Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud maîtreConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez le fichier
singers.scala
avec l'éditeur de textevi
,vim
ounano
préinstallé, puis collez le code Scala à partir de la liste de codes Scala.nano singers.scala
- Lancez le REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Exécutez "singers.scala" avec la commande
:load singers.scala
pour créer la table SpannerSingers
. Le résultat La fiche affiche des exemples de la sortie "Singers" (Chanteurs).> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Examinez le code et remplacez les espaces réservés [projectId], [instanceId], [databaseId] et [table] par l'ID du projet, l'ID de l'instance, l'ID de la base de données et la table que vous avez créés précédemment. L'option enableDataBoost active la fonctionnalité Data Boost de Spanner, qui
sur l'instance Spanner principale.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Exécuter le code sur votre cluster
- Se connecter au nœud maître du cluster Dataproc via SSH
- Accédez au Clusters Dataproc de la console Google Cloud, puis cliquez sur le nom de votre cluster
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster.
Une fenêtre de navigateur s'ouvre dans votre répertoire d'accueil sur le nœud principal.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez le fichier
singers.py
avec l'éditeur de textevi
,vim
ounano
préinstallé, puis collez le code PySpark à partir de la liste de code PySpark.nano singers.py
- Exécutez singers.py avec
spark-submit
pour créer la table SpannerSingers
. Le résultat est le suivant :spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Se connecter au nœud maître du cluster Dataproc via SSH
Nettoyage
Pour nettoyer et éviter que les ressources créées dans ce tutoriel ne soient facturées en permanence sur votre compte Google Cloud, procédez comme suit :
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME