Cette page explique comment créer un cluster Dataproc qui utilise le connecteur Spark Spanner pour lire des données à partir de Spanner à l'aide d'Apache Spark.
Calculer les coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
- Dataproc
- Spanner
- Cloud Storage
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Avant de commencer
Avant d'utiliser le connecteur Spanner dans ce tutoriel, configurez un cluster Dataproc, une instance et une base de données Spanner.
Configurer un cluster Dataproc
Créez un cluster Dataproc ou utilisez un cluster Dataproc existant avec les paramètres suivants:
Autorisations du compte de service de VM. Le compte de service de VM du cluster doit être associé aux autorisations Spanner appropriées. Si vous utilisez Data Boost (Data Boost est activé dans l'exemple de code de la section Lire des données à partir de Spanner), le compte de service de la VM doit également disposer des autorisations IAM Data Boost requises.
Niveau d'accès. Le cluster doit être créé avec le champ d'application
cloud-platform
ou le champ d'applicationspanner
approprié activé. Le champ d'applicationcloud-platform
est activé par défaut pour les clusters créés avec la version d'image 2.1 ou une version ultérieure.Les instructions suivantes vous expliquent comment définir l'étendue
cloud-platform
dans le cadre d'une requête de création de cluster qui utilise la console Google Cloud, gcloud CLI ou l'API Dataproc. Pour obtenir des instructions supplémentaires sur la création de clusters, consultez la page Créer un cluster.Console Google Cloud
- Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster.
- Dans le panneau Gérer la sécurité, dans la section Accès au projet, cliquez sur "Active le champ d'application Cloud Platform pour ce cluster".
- Renseignez ou confirmez les autres champs de création de cluster, puis cliquez sur Créer.
CLI gcloud
Vous pouvez exécuter la commande
gcloud dataproc clusters create
suivante pour créer un cluster avec la portéecloud-platform
activée.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Vous pouvez spécifier GceClusterConfig.serviceAccountScopes dans le cadre d'une requête clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configurer une instance Spanner avec une table de base de données Singers
Créez une instance Spanner avec une base de données contenant une table Singers
. Notez l'ID de l'instance Spanner et l'ID de la base de données.
Utiliser le connecteur Spanner avec Spark
Le connecteur Spanner est disponible pour les versions Spark 3.1+
.
Vous spécifiez la version du connecteur dans la spécification du fichier JAR du connecteur Cloud Storage lorsque vous envoyez une tâche à un cluster Dataproc.
Exemple:Envoi d'une tâche Spark avec la CLI gcloud avec le connecteur Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Remplacez les éléments suivants :
CONNECTOR_VERSION: version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub GoogleCloudDataproc/spark-spanner-connector
.
Lire des données depuis Spanner
Vous pouvez utiliser Python ou Scala pour lire les données de Spanner dans un DataFrame Spark à l'aide de l'API de source de données Spark.
PySpark
Vous pouvez exécuter l'exemple de code PySpark de cette section sur votre cluster en envoyant la tâche au service Dataproc ou en l'exécutant à partir de l'REPL spark-submit
sur le nœud maître du cluster.
Tâche Dataproc
- Créez un fichier
singers.py
à l'aide d'un éditeur de texte local ou dans Cloud Shell à l'aide de l'éditeur de textevi
,vim
ounano
préinstallé. - Collez le code suivant dans le fichier
singers.py
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.py
. - Envoyez la tâche au service Dataproc à l'aide de la console Google Cloud, de gcloud CLI ou de l'API Dataproc.
Exemple:Envoi d'une tâche avec la CLI gcloud avec le connecteur Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Remplacez les éléments suivants :
- CLUSTER_NAME : nom du nouveau cluster.
- REGION: région Compute Engine disponible pour exécuter la charge de travail.
- CONNECTOR_VERSION: version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Job spark-submit
- Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
- Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster.Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez un fichier
singers.py
sur le nœud maître à l'aide de l'éditeur de textevi
,vim
ounano
préinstallé.- Collez le code suivant dans le fichier
singers.py
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.py
.
- Collez le code suivant dans le fichier
- Exécutez
singers.py
avecspark-submit
pour créer la table SpannerSingers
.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Remplacez les éléments suivants :
- CONNECTOR_VERSION: version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
.
Voici le résultat :
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: version du connecteur Spanner.
Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
Scala
Pour exécuter l'exemple de code Scala sur votre cluster, procédez comme suit:
- Connectez-vous au nœud maître du cluster Dataproc à l'aide de SSH.
- Accédez à la page Clusters (Clusters) de Dataproc dans la console Google Cloud, puis cliquez sur le nom de votre cluster.
- Sur la page Détails du cluster, sélectionnez l'onglet "Instances de VM". Cliquez ensuite sur
SSH
à droite du nom du nœud maître du cluster.Une fenêtre de navigateur s'ouvre dans votre répertoire de base sur le nœud maître.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Créez un fichier
singers.scala
sur le nœud maître à l'aide de l'éditeur de textevi
,vim
ounano
préinstallé.- Collez le code suivant dans le fichier
singers.scala
. Notez que la fonctionnalité Data Boost de Spanner est activée, ce qui a un impact quasi nul sur l'instance Spanner principale.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre Google Cloud projet. Les ID de projet sont listés dans la section Project info (Informations sur le projet) du tableau de bord de la console Google Cloud.
- INSTANCE_ID, DATABASE_ID et TABLE_NAME : consultez la section Configurer une instance Spanner avec une table de base de données
Singers
.
- Enregistrez le fichier
singers.scala
.
- Collez le code suivant dans le fichier
- Lancez le REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Remplacez les éléments suivants :
CONNECTOR_VERSION: version du connecteur Spanner. Choisissez la version du connecteur Spanner dans la liste des versions du dépôt GitHub
GoogleCloudDataproc/spark-spanner-connector
. - Exécutez
singers.scala
avec la commande:load singers.scala
pour créer la table SpannerSingers
. La liste de sortie affiche des exemples de la sortie de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Nettoyage
Pour éviter que des frais ne continuent d'être facturés sur votre compte Google Cloud , vous pouvez arrêter ou supprimer votre cluster Dataproc, puis supprimer votre instance Spanner.