Travailler avec des procédures stockées pour Apache Spark
Ce document est destiné aux ingénieurs de données, aux data scientists et aux analystes de données qui souhaitent créer et appeler des procédures stockées pour Spark dans BigQuery.
BigQuery vous permet de créer et d'exécuter des procédures stockées Spark et écrites en Python, Java ou Scala. Vous pouvez ensuite exécuter ces procédures stockées dans BigQuery à l'aide d'une requête GoogleSQL, de la même manière que l'exécution de procédures stockées SQL.
Avant de commencer
Pour créer une procédure stockée pour Spark, demandez à votre administrateur de créer une connexion Spark et de la partager avec vous. Votre administrateur doit également accorder au compte de service associé à la connexion les autorisations Identity and Access Management (IAM) requises.
Rôles requis
Pour obtenir les autorisations nécessaires pour effectuer les tâches décrites dans ce document, demandez à votre administrateur de vous accorder les rôles IAM suivants:
-
Créer une procédure stockée pour Spark :
-
Éditeur de données BigQuery (
roles/bigquery.dataEditor
) sur l'ensemble de données dans lequel vous créez la procédure stockée -
Administrateur de connexion BigQuery (
roles/bigquery.connectionAdmin
) sur la connexion utilisée par la procédure stockée -
Utilisateur de job BigQuery (
roles/bigquery.jobUser
) sur votre projet
-
Éditeur de données BigQuery (
-
Appeler une procédure stockée pour Spark :
-
Lecteur de métadonnées BigQuery (
roles/bigquery.metadataViewer
) sur l'ensemble de données où la procédure stockée est stockée -
Utilisateur de connexion BigQuery (
roles/bigquery.connectionUser
) sur la connexion -
Utilisateur de job BigQuery (
roles/bigquery.jobUser
) sur votre projet
-
Lecteur de métadonnées BigQuery (
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Ces rôles prédéfinis contiennent les autorisations requises pour effectuer les tâches décrites dans ce document. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :
Autorisations requises
Vous devez disposer des autorisations suivantes pour effectuer les tâches décrites dans ce document :
-
Créez une connexion :
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Créer une procédure stockée pour Spark :
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Appeler une procédure stockée pour Spark :
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.
Considérations au sujet des emplacements
Vous devez créer une procédure stockée pour Spark au même emplacement que votre connexion, car la procédure s'exécute au même emplacement que la connexion. Par exemple, pour créer une procédure stockée dans l'emplacement multirégional États-Unis, vous utilisez une connexion située dans l'emplacement multirégional États-Unis.
Tarification
Les frais d'exécution de procédures Spark sur BigQuery sont semblables à ceux de l'exécution de procédures Spark sur Dataproc sans serveur. Pour en savoir plus, consultez la page Tarifs de Dataproc sans serveur.
Les procédures stockées Spark peuvent être utilisées avec le modèle de tarification à la demande ainsi qu'avec n'importe quelle édition BigQuery. Les procédures Spark sont facturées selon le modèle de paiement à l'usage de l'édition BigQuery Enterprise dans tous les cas, quel que soit le modèle de tarification utilisé dans votre projet.
Les procédures stockées Spark pour BigQuery ne sont pas compatibles avec l'utilisation de réservations ni d'engagements. Les réservations et engagements existants continuent d'être utilisés pour d'autres requêtes et procédures compatibles. Les frais d'utilisation des procédures stockées Spark sont ajoutés à votre facture à tarif Enterprise Edition avec paiement à l'usage. Les remises de votre organisation sont appliquées, le cas échéant.
Bien que les procédures stockées Spark utilisent un moteur d'exécution Spark, vous ne verrez pas de frais distincts pour l'exécution Spark. Comme indiqué, les frais correspondants sont indiqués en tant que SKU de l'édition Enterprise de BigQuery avec paiement à l'usage.
Les procédures stockées Spark n'offrent pas de quota gratuit.
Créer une procédure stockée pour Spark
Vous devez créer la procédure stockée au même emplacement que la connexion que vous utilisez.
Si le corps de votre procédure stockée est supérieur à 1 Mo, nous vous recommandons de placer votre procédure stockée dans un fichier dans un bucket Cloud Storage plutôt que d'utiliser du code intégré. BigQuery propose deux méthodes pour créer une procédure stockée pour Spark en utilisant Python :
- Pour utiliser l'instruction
CREATE PROCEDURE
, utilisez l'éditeur de requête SQL. - Si vous souhaitez saisir directement du code Python, utilisez l'éditeur PySpark. Vous pouvez enregistrer le code en tant que procédure stockée.
Utiliser l'éditeur de requête SQL
Pour créer une procédure stockée pour Spark dans l'éditeur de requête SQL, procédez comme suit :
Accédez à la page BigQuery.
Dans l'éditeur de requête, ajoutez l'exemple de code pour l'instruction
CREATE PROCEDURE
qui s'affiche.Sinon, dans le volet Explorateur, cliquez sur la connexion du projet que vous avez utilisé pour créer la ressource de connexion. Ensuite, pour créer une procédure stockée pour Spark, cliquez sur
Créer une procédure stockée.Python
Pour créer des procédures stockées pour Spark en Python, utilisez l'exemple de code suivant :
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java ou Scala
Pour créer une procédure stockée pour Spark en Java ou Scala avec l'option
main_file_uri
, utilisez l'exemple de code suivant :CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Pour créer une procédure stockée pour Spark en Java ou Scala avec les options
main_class
etjar_uris
, utilisez l'exemple de code suivant :CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Remplacez les éléments suivants :
PROJECT_ID
: projet dans lequel vous souhaitez créer la procédure stockée, par exemplemyproject
.DATASET
: ensemble de données dans lequel vous souhaitez créer la procédure stockée, par exemplemydataset
.PROCEDURE_NAME
: nom de la procédure stockée que vous souhaitez exécuter dans BigQuery, par exemple,mysparkprocedure
.PROCEDURE_ARGUMENT
: paramètre permettant de saisir les arguments d'entrée.Dans ce paramètre, spécifiez les champs suivants :
ARGUMENT_MODE
: mode de l'argument.Les valeurs autorisées incluent
IN
,OUT
etINOUT
. La valeur par défaut estIN
.ARGUMENT_NAME
: nom de l'argument.ARGUMENT_TYPE
: type de l'argument.
Exemple :
myproject.mydataset.mysparkproc(num INT64)
.Pour plus d'informations, référez-vous aux sections du présent document qui expliquent comment transmettre une valeur en tant que paramètre
IN
ou en tant que paramètresOUT
etINOUT
.CONNECTION_PROJECT_ID
: projet contenant la connexion permettant d'exécuter la procédure Spark.CONNECTION_REGION
: région contenant la connexion permettant d'exécuter la procédure Spark (par exemple,us
).CONNECTION_ID
: ID de connexion, par exemplemyconnection
.Lorsque vous affichez les détails de la connexion dans la console Google Cloud, il s'agit de la valeur de la dernière section de l'ID de connexion complet affiché dans ID de connexion (par exemple,
projects/myproject/locations/connection_location/connections/myconnection
).RUNTIME_VERSION
: version d'exécution de Spark (par exemple,1.1
)MAIN_PYTHON_FILE_URI
: chemin d'accès à un deuxième fichier PySpark, par exemplegs://mybucket/mypysparkmain.py
.Si vous souhaitez ajouter le corps de la procédure stockée dans l'instruction
CREATE PROCEDURE
, ajoutez lePYSPARK_CODE
aprèsLANGUAGE PYTHON AS
comme indiqué dans l'exemple dans la section Utiliser du code intégré dans ce document.PYSPARK_CODE
: définition d'une application PySpark dans l'instructionCREATE PROCEDURE
si vous souhaitez transmettre le corps de la procédure de manière intégrée.La valeur est un littéral de chaîne. Si le code inclut des guillemets et des barres obliques inverses, ceux-ci doivent être échappés ou représentés sous forme de chaîne brute. Par exemple, le code
"\n";
peut être représenté comme suit :- Chaîne entre guillemets :
"return \"\\n\";"
. Les guillemets et les barres obliques inverses sont échappés. - Chaîne entre guillemets triples :
"""return "\\n";"""
. Les barres obliques inverses sont échappées, mais pas les guillemets. - Chaîne brute :
r"""return "\n";"""
. Aucun échappement n'est nécessaire.
- Chaîne entre guillemets :
MAIN_JAR_URI
: chemin d'accès au fichier JAR contenant la classemain
, par exemplegs://mybucket/my_main.jar
.CLASS_NAME
: nom complet d'une classe dans un ensemble JAR avec l'optionjar_uris
(par exemple,com.example.wordcount
).URI
: chemin d'accès au fichier JAR contenant la classe spécifiée dans la classemain
, par exemplegs://mybucket/mypysparkmain.jar
.
Pour connaître les options supplémentaires que vous pouvez spécifier dans
OPTIONS
, consultez la liste des options de procédure.
Utiliser l'éditeur PySpark
Lorsque vous créez une procédure à l'aide de l'éditeur PySpark, vous n'avez pas besoin d'utiliser l'instruction CREATE PROCEDURE
. Ajoutez plutôt votre code Python directement dans l'éditeur Pyspark, puis enregistrez-le ou exécutez-le.
Pour créer une procédure stockée pour Spark dans l'éditeur PySpark, procédez comme suit :
Accédez à la page BigQuery.
Si vous souhaitez saisir directement le code PySpark, ouvrez l'éditeur PySpark. Pour ouvrir l'éditeur PySpark, cliquez sur le menu
à côté de Créer une requête SQL, puis sélectionnez Créer une procédure PySpark.Pour définir les options, cliquez sur Plus > Options PySpark, puis procédez comme suit :
Spécifiez l'emplacement où vous souhaitez exécuter le code PySpark.
Dans le champ Connexions, spécifiez la connexion Spark.
Dans la section Appel de procédure stockée, spécifiez l'ensemble de données dans lequel vous souhaitez stocker les procédures stockées temporaires qui seront générées. Vous pouvez définir un ensemble de données spécifique ou autoriser l'utilisation d'un ensemble de données temporaire pour appeler le code PySpark.
L'ensemble de données temporaire est généré à l'emplacement spécifié à l'étape précédente. Si un nom d'ensemble de données est spécifié, l'ensemble de données et la connexion Spark doivent se trouver au même emplacement.
Dans la section Parameters (Paramètres), définissez les paramètres de la procédure stockée. La valeur du paramètre n'est utilisée que lors des exécutions en session du code PySpark, mais la déclaration elle-même est stockée dans la procédure.
Dans la section Options avancées, spécifiez les options de procédure. Pour obtenir une liste détaillée des options de procédure, consultez la liste des options de procédure.
Dans la section Propriétés, ajoutez les paires clé/valeur pour configurer le job. Vous pouvez utiliser n'importe quelle paire clé/valeur issue des propriétés Spark de Dataproc sans serveur.
Dans Paramètres du compte de service, spécifiez le compte de service personnalisé, la clé CMEK, l'ensemble de données de préproduction et le dossier Cloud Storage de préproduction à utiliser lors des exécutions en session du code PySpark.
Cliquez sur Enregistrer.
Enregistrer une procédure stockée pour Spark
Après avoir créé la procédure stockée à l'aide de l'éditeur PySpark, vous pouvez l'enregistrer. Pour cela, procédez comme suit :
Dans la console Google Cloud, accédez à la page BigQuery.
Dans l'éditeur de requête, créez une procédure stockée pour Spark en utilisant Python avec l'éditeur PySpark.
Cliquez sur Enregistrer > Enregistrer la procédure.
Dans la boîte de dialogue Enregistrer la procédure stockée, spécifiez le nom de l'ensemble de données dans lequel vous souhaitez stocker la procédure et le nom à donner à la procédure stockée.
Cliquez sur Enregistrer.
Si vous souhaitez uniquement exécuter le code PySpark au lieu de l'enregistrer en tant que procédure stockée, vous pouvez cliquer sur Exécuter au lieu de Enregistrer.
Utiliser des conteneurs personnalisés
Le conteneur personnalisé fournit l'environnement d'exécution pour les processus de pilote et d'exécuteur de la charge de travail. Pour utiliser des conteneurs personnalisés, utilisez l'exemple de code suivant :
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Remplacez les éléments suivants :
PROJECT_ID
: projet dans lequel vous souhaitez créer la procédure stockée, par exemplemyproject
.DATASET
: ensemble de données dans lequel vous souhaitez créer la procédure stockée, par exemplemydataset
.PROCEDURE_NAME
: nom de la procédure stockée que vous souhaitez exécuter dans BigQuery, par exemple,mysparkprocedure
.PROCEDURE_ARGUMENT
: paramètre permettant de saisir les arguments d'entrée.Dans ce paramètre, spécifiez les champs suivants :
ARGUMENT_MODE
: mode de l'argument.Les valeurs autorisées incluent
IN
,OUT
etINOUT
. La valeur par défaut estIN
.ARGUMENT_NAME
: nom de l'argument.ARGUMENT_TYPE
: type de l'argument.
Exemple :
myproject.mydataset.mysparkproc(num INT64)
.Pour plus d'informations, référez-vous aux sections du présent document qui expliquent comment transmettre une valeur en tant que paramètre
IN
ou en tant que paramètresOUT
etINOUT
.CONNECTION_PROJECT_ID
: projet contenant la connexion permettant d'exécuter la procédure Spark.CONNECTION_REGION
: région contenant la connexion permettant d'exécuter la procédure Spark (par exemple,us
).CONNECTION_ID
: ID de connexion, par exemplemyconnection
.Lorsque vous affichez les détails de la connexion dans la console Google Cloud, il s'agit de la valeur de la dernière section de l'ID de connexion complet affiché dans ID de connexion (par exemple,
projects/myproject/locations/connection_location/connections/myconnection
).RUNTIME_VERSION
: version d'exécution de Spark (par exemple,1.1
)MAIN_PYTHON_FILE_URI
: chemin d'accès à un deuxième fichier PySpark, par exemplegs://mybucket/mypysparkmain.py
.Si vous souhaitez ajouter le corps de la procédure stockée dans l'instruction
CREATE PROCEDURE
, ajoutez lePYSPARK_CODE
aprèsLANGUAGE PYTHON AS
comme indiqué dans l'exemple dans la section Utiliser du code intégré dans ce document.PYSPARK_CODE
: définition d'une application PySpark dans l'instructionCREATE PROCEDURE
si vous souhaitez transmettre le corps de la procédure de manière intégrée.La valeur est un littéral de chaîne. Si le code inclut des guillemets et des barres obliques inverses, ceux-ci doivent être échappés ou représentés sous forme de chaîne brute. Par exemple, le code
"\n";
peut être représenté comme suit :- Chaîne entre guillemets :
"return \"\\n\";"
. Les guillemets et les barres obliques inverses sont échappés. - Chaîne entre guillemets triples :
"""return "\\n";"""
. Les barres obliques inverses sont échappées, mais pas les guillemets. - Chaîne brute :
r"""return "\n";"""
. Aucun échappement n'est nécessaire.
- Chaîne entre guillemets :
CONTAINER_IMAGE
: chemin d'accès à l'image dans Artifact Registry. Il ne doit contenir que les bibliothèques à utiliser dans votre procédure. Si elle n'est pas spécifiée, l'image de conteneur par défaut du système associée à la version d'exécution est utilisée.
Pour en savoir plus sur la création d'une image de conteneur personnalisée avec Spark, consultez la section Créer une image de conteneur personnalisée.
Appeler une procédure stockée pour Spark
Après avoir créé une procédure stockée, vous pouvez l'appeler à l'aide de l'une des options suivantes :
Console
Accédez à la page BigQuery.
Dans le volet Explorateur, développez votre projet et sélectionnez la procédure stockée pour Spark que vous souhaitez exécuter.
Dans la fenêtre Informations de procédure stockées, cliquez sur Appeler la procédure stockée. Vous pouvez également développer l'option Afficher les actions, puis cliquer sur Appeler.
Cliquez sur Exécuter.
Dans la section Tous les résultats, cliquez sur Afficher les résultats.
Facultatif : dans la section Résultats de la requête, procédez comme suit :
Si vous souhaitez afficher les journaux du pilote Spark, cliquez sur Détails d'exécution.
Si vous souhaitez afficher les journaux dans Cloud Logging, cliquez sur Informations sur la tâche, puis, dans le champ Journal, cliquez sur journal
Si vous souhaitez obtenir le point de terminaison du serveur d'historique Spark, cliquez sur Informations sur le job, puis sur Serveur d'historique Spark.
SQL
Pour appeler une procédure stockée, utilisez l'instruction CALL PROCEDURE
:
Dans la console Google Cloud, accédez à la page BigQuery.
Dans l'éditeur de requête, saisissez l'instruction suivante :
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Cliquez sur
Exécuter.
Pour en savoir plus sur l'exécution des requêtes, consultez Exécuter une requête interactive.
Utiliser un compte de service personnalisé
Au lieu d'utiliser l'identité du service de la connexion Spark pour l'accès aux données, vous pouvez utiliser un compte de service personnalisé pour accéder aux données de votre code Spark.
Pour utiliser un compte de service personnalisé, spécifiez le mode de sécurité INVOKER
(à l'aide de l'instruction EXTERNAL SECURITY INVOKER
) lorsque vous créez une procédure stockée Spark et spécifiez le compte de service lorsque vous appelez la procédure stockée.
Si vous souhaitez accéder au code Spark et l'utiliser à partir de Cloud Storage, vous devez accorder les autorisations nécessaires à l'identité de service de la connexion Spark. Vous devez accorder au compte de service de la connexion l'autorisation IAM storage.objects.get
ou le rôle IAM storage.objectViewer
.
Vous pouvez également accorder au compte de service de la connexion l'accès à Dataproc Metastore et au serveur d'historique persistant Dataproc si vous les avez spécifiés dans la connexion. Pour en savoir plus, consultez la section Accorder l'accès au compte de service.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Vous pouvez également ajouter les arguments suivants au code précédent :
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Remplacez les éléments suivants :
CUSTOM_SERVICE_ACCOUNT
: valeur obligatoire. Un compte de service personnalisé que vous avez fourni.BUCKET_NAME
: facultatif. Bucket Cloud Storage utilisé comme système de fichiers par défaut de l'application Spark. Si cet argument n'est pas fourni, un bucket Cloud Storage par défaut est créé dans votre projet et il est partagé par tous les jobs exécutés dans le même projet.DATASET
: facultatif. Ensemble de données dans lequel stocker les données temporaires produites en appelant la procédure. Les données sont nettoyées une fois le job terminé. S'il n'est pas fourni, un ensemble de données temporaire par défaut est créé pour le job.
Votre compte de service personnalisé doit disposer des autorisations suivantes :
Pour lire et écrire dans le bucket de préproduction utilisé comme système de fichiers par défaut de l'application Spark, procédez comme suit :
- Autorisations
storage.objects.*
ou rôle IAMroles/storage.objectAdmin
sur le bucket de préproduction que vous spécifiez. - En outre, les autorisations
storage.buckets.*
ou le rôle IAMroles/storage.Admin
sur le projet si le bucket de préproduction n'est pas spécifié.
- Autorisations
(Facultatif) Pour lire et écrire des données depuis et vers BigQuery, procédez comme suit :
bigquery.tables.*
sur vos tables BigQuery.bigquery.readsessions.*
sur votre projet- Le rôle IAM
roles/bigquery.admin
inclut les autorisations précédentes.
(Facultatif) Pour lire et écrire des données depuis et vers Cloud Storage :
- Autorisations
storage.objects.*
ou rôle IAMroles/storage.objectAdmin
sur vos objets Cloud Storage.
- Autorisations
(Facultatif) Pour lire et écrire dans l'ensemble de données de préproduction utilisé pour les paramètres
INOUT/OUT
, procédez comme suit :- Rôle IAM
bigquery.tables.*
ouroles/bigquery.dataEditor
sur l'ensemble de données de préproduction que vous spécifiez. - En outre, l'autorisation
bigquery.datasets.create
ou le rôle IAMroles/bigquery.dataEditor
sur le projet si l'ensemble de données de préproduction n'est pas spécifié.
- Rôle IAM
Exemples de procédures stockées pour Spark
Cette section présente des exemples de création d'une procédure stockée pour Apache Spark.
Utiliser un fichier PySpark ou JAR dans Cloud Storage
L'exemple suivant montre comment créer une procédure stockée pour Spark en utilisant la connexion my-project-id.us.my-connection
et un fichier PySpark ou JAR stocké dans un bucket Cloud Storage :
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java ou Scala
Utilisez main_file_uri
pour créer une procédure stockée :
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Utilisez main_class
pour créer une procédure stockée :
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Utiliser du code intégré
L'exemple suivant montre comment créer une procédure stockée pour Spark en utilisant une connexion my-project-id.us.my-connection
et du code PySpark intégré :
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Transmettre une valeur en tant que paramètre d'entrée
Les exemples suivants illustrent les deux méthodes permettant de transmettre une valeur en tant que paramètre d'entrée dans Python :
Méthode 1 : Utiliser des variables d'environnement
Dans le code PySpark, vous pouvez obtenir les paramètres d'entrée de la procédure stockée pour Spark en utilisant des variables d'environnement dans le pilote et les exécuteurs Spark. Le nom de la variable d'environnement est au format BIGQUERY_PROC_PARAM.PARAMETER_NAME
, où PARAMETER_NAME
est le nom du paramètre d'entrée. Par exemple, si le nom du paramètre d'entrée est var
, le nom de la variable d'environnement correspondante est BIGQUERY_PROC_PARAM.var
. Les paramètres d'entrée sont encodés au format JSON.
Dans votre code PySpark, vous pouvez obtenir la valeur du paramètre d'entrée dans une chaîne JSON à partir de la variable d'environnement, puis la décoder pour obtenir une variable Python.
L'exemple suivant montre comment récupérer la valeur d'un paramètre d'entrée de type INT64
dans votre code PySpark :
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Méthode 2 : Utiliser une bibliothèque intégrée
Dans le code PySpark, vous pouvez simplement importer une bibliothèque intégrée et l'utiliser pour renseigner tous les types de paramètres. Pour transmettre les paramètres aux exécuteurs, renseignez-les dans un pilote Spark en tant que variables Python, puis transmettez les valeurs aux exécuteurs. La bibliothèque intégrée est compatible avec la plupart des types de données BigQuery, à l'exception de INTERVAL
, GEOGRAPHY
, NUMERIC
et BIGNUMERIC
.
Type de données BigQuery | Type de données Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Non compatible |
BIGNUMERIC
|
Non compatible |
INTERVAL
|
Non compatible |
GEOGRAPHY
|
Non compatible |
L'exemple suivant montre comment importer la bibliothèque intégrée et l'utiliser pour renseigner un paramètre d'entrée de type INT64 et un paramètre d'entrée de type ARRAY<STRUCT<a INT64, b STRING>> dans votre code PySpark :
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Dans le code Java ou Scala, vous pouvez récupérer les paramètres d'entrée de la procédure stockée pour Spark via des variables d'environnement dans le pilote et les exécuteurs Spark. Le nom de la variable d'environnement est au format BIGQUERY_PROC_PARAM.PARAMETER_NAME
, où PARAMETER_NAME
est le nom du paramètre d'entrée. Par exemple, si le nom du paramètre d'entrée est var, le nom de la variable d'environnement correspondante est BIGQUERY_PROC_PARAM.var
.
Dans votre code Java ou Scala, vous pouvez récupérer la valeur du paramètre d'entrée à partir de la variable d'environnement.
L'exemple suivant montre comment récupérer la valeur d'un paramètre d'entrée à partir de variables d'environnement dans votre code Scala :
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
L'exemple suivant montre comment récupérer des paramètres d'entrée à partir de variables d'environnement dans votre code Java :
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Transmettre des valeurs en tant que paramètres OUT
et INOUT
Le paramètre de sortie renvoie la valeur de la procédure Spark, tandis que le paramètre INOUT
accepte une valeur pour la procédure puis renvoie une valeur provenant de la procédure.
Pour utiliser les paramètres OUT
et INOUT
, ajoutez le mot clé OUT
ou INOUT
avant le nom du paramètre lors de la création de la procédure Spark. Dans le code PySpark, vous utilisez la bibliothèque intégrée pour renvoyer une valeur en tant que paramètre OUT
ou INOUT
. Comme pour les paramètres d'entrée, la bibliothèque intégrée accepte la plupart des types de données BigQuery, à l'exception de INTERVAL
, GEOGRAPHY
, NUMERIC
et BIGNUMERIC
. Les valeurs de type TIME
et DATETIME
sont converties au fuseau horaire UTC lorsqu'elles sont retournées par l'intermédiaire des paramètres OUT
ou INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Lire à partir d'une table de métastore Hive et écrire les résultats dans BigQuery
L'exemple suivant montre comment transformer une table de métastore Hive et écrire les résultats dans BigQuery :
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Afficher les filtres de journaux
Après avoir appelé une procédure stockée pour Spark, vous pouvez afficher les informations des journaux. Pour obtenir les informations de filtrage Cloud Logging et le point de terminaison du cluster d'historique Spark, utilisez la commande bq
show
.
Les informations de filtrage sont disponibles dans le champ SparkStatistics
du job enfant. Pour obtenir les filtres de journaux, procédez comme suit :
Accédez à la page BigQuery.
Dans l'éditeur de requêtes, répertoriez les tâches enfants de la tâche de script de la procédure stockée :
bq ls -j --parent_job_id=$parent_job_id
Pour savoir comment obtenir l'ID du job, consultez la page Afficher les détails d'un job.
Le résultat ressemble à ce qui suit :
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifiez le
jobId
de votre procédure stockée et utilisez la commandebq show
pour afficher les détails de la tâche :bq show --format=prettyjson --job $child_job_id
Copiez le champ
sparkStatistics
, car vous en aurez besoin dans une autre étape.Le résultat ressemble à ce qui suit :
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Pour Logging, générez des filtres de journaux avec les champs
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
Les journaux sont écrits dans la ressource surveillée
bigquery.googleapis.com/SparkJob
. Les journaux sont libellés par les composantsINFO
,DRIVER
etEXECUTOR
. Pour filtrer les journaux du pilote Spark, ajoutez le composantlabels.component = "DRIVER"
aux filtres de journaux. Pour filtrer les journaux de l'exécuteur Spark, ajoutez le composantlabels.component = "EXECUTOR"
aux filtres de journaux.
Utiliser la clé de chiffrement gérée par le client
La procédure BigQuery Spark utilise la clé de chiffrement gérée par le client (CMEK) pour protéger votre contenu, ainsi que le chiffrement par défaut fourni par BigQuery. Pour utiliser la clé CMEK dans la procédure Spark, commencez par déclencher la créaation du compte de service de chiffrement BigQuery et accordez les autorisations requises. La procédure Spark est également compatible avec les règles d'administration CMEK si elles sont appliquées à votre projet.
Si votre procédure stockée utilise le mode de sécurité INVOKER
, votre clé CMEK doit être spécifiée via la variable système SQL lors de l'appel de la procédure. Sinon, votre CMEK peut être spécifié via la connexion associée à la procédure stockée.
Pour spécifier la clé CMEK via la connexion lorsque vous créez une procédure stockée Spark, utilisez l'exemple de code suivant :
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Pour spécifier la clé CMEK via la variable système SQL lors de l'appel de la procédure, utilisez l'exemple de code suivant :
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Utiliser VPC Service Controls
VPC Service Controls vous permet de configurer un périmètre sécurisé pour vous protéger contre l'exfiltration de données. Pour utiliser VPC Service Controls avec une procédure Spark afin de renforcer la sécurité, commencez par créer un périmètre de service.
Pour protéger entièrement vos jobs de procédure Spark, ajoutez les API suivantes au périmètre de service :
- API BigQuery (
bigquery.googleapis.com
) - API Cloud Logging (
logging.googleapis.com
) - API Cloud Storage (
storage.googleapis.com
), si vous utilisez Cloud Storage - API Artifact Registry (
artifactregistry.googleapis.com
) ou Container Registry (containerregistry.googleapis.com
), si vous utilisez un conteneur personnalisé - API Dataproc Metastore (
metastore.googleapis.com
) et API Cloud Run Admin (run.googleapis.com
), si vous utilisez Dataproc Metastore
Ajoutez le projet de requête de la procédure Spark au périmètre. Ajoutez d'autres projets qui hébergent votre code ou vos données Spark au périmètre.
Bonnes pratiques
La première utilisation d'une connexion dans votre projet prend environ une minute. Pour gagner du temps, vous pouvez réutiliser une connexion Spark existante lorsque vous créez une procédure stockée pour Spark.
Lorsque vous créez une procédure Spark pour un usage en production, Google vous recommande de spécifier une version d'exécution. Pour obtenir la liste des versions d'exécution compatibles, consultez la page Versions d'exécution Dataproc sans serveur. Nous vous recommandons d'utiliser la version LTS (Long-Time-Support).
Lorsque vous spécifiez un conteneur personnalisé dans une procédure Spark, nous vous recommandons d'utiliser Artifact Registry et le streaming d'images.
Pour améliorer les performances, vous pouvez spécifier des propriétés d'allocation de ressources dans la procédure Spark. Les procédures stockées Spark sont compatibles avec une liste de propriétés d'allocation de ressources identique à celle de Dataproc sans serveur.
Limites
- Vous ne pouvez utiliser que le protocole du point de terminaison gRPC pour vous connecter au métastore Dataproc. Les autres types de métastore Hive ne sont pas encore acceptés.
- Les clés de chiffrement gérées par le client (CMEK) ne sont disponibles que lorsque les clients créent des procédures Spark à région unique. Les clés CMEK des régions mondiales et les clés CMEK multirégionales, par exemple
EU
ouUS
, ne sont pas acceptées. - La transmission de paramètres de sortie n'est possible que pour PySpark.
- Si l'ensemble de données associé à la procédure stockée pour Spark est répliqué dans une région de destination via la réplication interrégionale d'ensembles de données, la procédure stockée ne peut être interrogée que dans la région dans laquelle elle a été créée.
- Spark n'est pas compatible avec l'accès aux points de terminaison HTTP de votre réseau VPC Service Controls privé.
Quotas et limites
Pour en savoir plus sur les quotas et les limites, consultez la page Quotas et limites applicables aux procédures stockées pour Spark.
Étapes suivantes
- Apprenez à afficher une procédure stockée.
- Découvrez comment supprimer une procédure stockée.
- Découvrez comment utiliser une procédure stockée SQL.