Travailler avec des procédures stockées pour Apache Spark

Ce document est destiné aux ingénieurs de données, aux data scientists et aux analystes de données qui souhaitent créer et appeler des procédures stockées pour Spark dans BigQuery.

BigQuery vous permet de créer et d'exécuter des procédures stockées Spark et écrites en Python, Java ou Scala. Vous pouvez ensuite exécuter ces procédures stockées dans BigQuery à l'aide d'une requête GoogleSQL, de la même manière que l'exécution de procédures stockées SQL.

Avant de commencer

Pour créer une procédure stockée pour Spark, demandez à votre administrateur de créer une connexion Spark et de la partager avec vous. Votre administrateur doit également accorder au compte de service associé à la connexion les autorisations Identity and Access Management (IAM) requises.

Rôles requis

Pour obtenir les autorisations nécessaires pour effectuer les tâches décrites dans ce document, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer les accès.

Ces rôles prédéfinis contiennent les autorisations requises pour effectuer les tâches décrites dans ce document. Pour afficher les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Vous devez disposer des autorisations suivantes pour effectuer les tâches décrites dans ce document :

  • Créer une connexion :
    • bigquery.connections.create
    • bigquery.connections.list
  • Créer une procédure stockée pour Spark :
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Appeler une procédure stockée pour Spark :
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Considérations au sujet des emplacements

Vous devez créer une procédure stockée pour Spark au même emplacement que votre connexion, car la procédure s'exécute au même emplacement que la connexion. Par exemple, pour créer une procédure stockée dans l'emplacement multirégional États-Unis, vous utilisez une connexion située dans l'emplacement multirégional États-Unis.

Tarification

  • Les frais d'exécution de procédures Spark sur BigQuery sont semblables à ceux de l'exécution de procédures Spark sur Dataproc sans serveur. Pour en savoir plus, consultez la page Tarifs de Dataproc sans serveur.

  • Les procédures stockées Spark peuvent être utilisées avec le modèle de tarification à la demande ainsi qu'avec n'importe quelle édition BigQuery. Les procédures Spark sont facturées selon le modèle de paiement à l'usage de l'édition BigQuery Enterprise dans tous les cas, quel que soit le modèle de tarification utilisé dans votre projet.

  • Les procédures stockées Spark pour BigQuery ne sont pas compatibles avec les réservations ou les engagements. Les réservations et les engagements existants continuent d'être utilisés pour d'autres requêtes et procédures compatibles. Les frais d'utilisation des procédures stockées Spark sont ajoutés à votre facture dans l'édition Enterprise, avec paiement à l'usage. Les remises de votre organisation sont appliquées, le cas échéant.

  • Bien que les procédures stockées Spark utilisent un moteur d'exécution Spark, vous ne verrez pas de frais distincts pour l'exécution de Spark. Comme indiqué, les frais correspondants sont indiqués en tant que SKU de l'édition Enterprise de BigQuery avec paiement à l'usage.

  • Les procédures stockées Spark n'offrent pas de quota gratuit.

Créer une procédure stockée pour Spark

Vous devez créer la procédure stockée au même emplacement que la connexion que vous utilisez.

Si le corps de votre procédure stockée est supérieur à 1 Mo, nous vous recommandons de placer votre procédure stockée dans un fichier dans un bucket Cloud Storage plutôt que d'utiliser du code intégré. BigQuery propose deux méthodes pour créer une procédure stockée pour Spark en utilisant Python :

Utiliser l'éditeur de requête SQL

Pour créer une procédure stockée pour Spark dans l'éditeur de requête SQL, procédez comme suit :

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, ajoutez l'exemple de code pour l'instruction CREATE PROCEDURE qui s'affiche.

    Sinon, dans le volet Explorateur, cliquez sur la connexion du projet que vous avez utilisé pour créer la ressource de connexion. Ensuite, pour créer une procédure stockée pour Spark, cliquez sur Créer une procédure stockée.

    Python

    Pour créer des procédures stockées pour Spark en Python, utilisez l'exemple de code suivant :

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java ou Scala

    Pour créer une procédure stockée pour Spark en Java ou Scala avec l'option main_file_uri, utilisez l'exemple de code suivant :

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    Pour créer une procédure stockée pour Spark en Java ou Scala avec les options main_class et jar_uris, utilisez l'exemple de code suivant :

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    Remplacez les éléments suivants :

    • PROJECT_ID : projet dans lequel vous souhaitez créer la procédure stockée, par exemple myproject.
    • DATASET : ensemble de données dans lequel vous souhaitez créer la procédure stockée, par exemple mydataset.
    • PROCEDURE_NAME : nom de la procédure stockée que vous souhaitez exécuter dans BigQuery, par exemple, mysparkprocedure.
    • PROCEDURE_ARGUMENT : paramètre permettant de saisir les arguments d'entrée.

      Dans ce paramètre, spécifiez les champs suivants :

      • ARGUMENT_MODE : mode de l'argument.

        Les valeurs autorisées incluent IN, OUT et INOUT. La valeur par défaut est IN.

      • ARGUMENT_NAME : nom de l'argument.
      • ARGUMENT_TYPE : type de l'argument.

      Exemple : myproject.mydataset.mysparkproc(num INT64).

      Pour plus d'informations, référez-vous aux sections du présent document qui expliquent comment transmettre une valeur en tant que paramètre IN ou en tant que paramètres OUT et INOUT.

    • CONNECTION_PROJECT_ID : projet contenant la connexion permettant d'exécuter la procédure Spark.
    • CONNECTION_REGION : région contenant la connexion permettant d'exécuter la procédure Spark (par exemple, us).
    • CONNECTION_ID : ID de connexion, par exemple myconnection.

      Lorsque vous affichez les détails de la connexion dans la console Google Cloud, il s'agit de la valeur de la dernière section de l'ID de connexion complet affiché dans ID de connexion (par exemple, projects/myproject/locations/connection_location/connections/myconnection).

    • RUNTIME_VERSION : version d'exécution de Spark (par exemple, 1.1)
    • MAIN_PYTHON_FILE_URI : chemin d'accès à un deuxième fichier PySpark, par exemple gs://mybucket/mypysparkmain.py.

      Si vous souhaitez ajouter le corps de la procédure stockée dans l'instruction CREATE PROCEDURE, ajoutez le PYSPARK_CODE après LANGUAGE PYTHON AS comme indiqué dans l'exemple dans la section Utiliser du code intégré dans ce document.

    • PYSPARK_CODE : définition d'une application PySpark dans l'instruction CREATE PROCEDURE si vous souhaitez transmettre le corps de la procédure de manière intégrée.

      La valeur est un littéral de chaîne. Si le code inclut des guillemets et des barres obliques inverses, ceux-ci doivent être échappés ou représentés sous forme de chaîne brute. Par exemple, le code "\n"; peut être représenté comme suit :

      • Chaîne entre guillemets : "return \"\\n\";". Les guillemets et les barres obliques inverses sont échappés.
      • Chaîne entre guillemets triples : """return "\\n";""". Les barres obliques inverses sont échappées, mais pas les guillemets.
      • Chaîne brute : r"""return "\n";""". Aucun échappement n'est nécessaire.
      Pour savoir comment ajouter du code PySpark intégré, consultez la section Utiliser le code intégré.
    • MAIN_JAR_URI : chemin d'accès au fichier JAR contenant la classe main, par exemple gs://mybucket/my_main.jar.
    • CLASS_NAME : nom complet d'une classe dans un ensemble JAR avec l'option jar_uris (par exemple, com.example.wordcount).
    • URI : chemin d'accès au fichier JAR contenant la classe spécifiée dans la classe main, par exemple gs://mybucket/mypysparkmain.jar.

    Pour connaître les options supplémentaires que vous pouvez spécifier dans OPTIONS, consultez la liste des options de procédure.

Utiliser l'éditeur PySpark

Lorsque vous créez une procédure à l'aide de l'éditeur PySpark, vous n'avez pas besoin d'utiliser l'instruction CREATE PROCEDURE. Ajoutez plutôt votre code Python directement dans l'éditeur Pyspark, puis enregistrez ou exécutez-le.

Pour créer une procédure stockée pour Spark dans l'éditeur PySpark, procédez comme suit :

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Si vous souhaitez saisir directement le code PySpark, ouvrez l'éditeur PySpark. Pour ouvrir l'éditeur PySpark, cliquez sur le menu à côté de Créer une requête SQL, puis sélectionnez Créer une procédure PySpark.

  3. Pour définir les options, cliquez sur Plus > Options PySpark, puis procédez comme suit :

    1. Spécifiez l'emplacement où vous souhaitez exécuter le code PySpark.

    2. Dans le champ Connexions, spécifiez la connexion Spark.

    3. Dans la section Appel de procédure stockée, spécifiez l'ensemble de données dans lequel vous souhaitez stocker les procédures stockées temporaires qui seront générées. Vous pouvez définir un ensemble de données spécifique ou autoriser l'utilisation d'un ensemble de données temporaire pour appeler le code PySpark.

      L'ensemble de données temporaire est généré à l'emplacement spécifié à l'étape précédente. Si un nom d'ensemble de données est spécifié, l'ensemble de données et la connexion Spark doivent se trouver au même emplacement.

    4. Dans la section Paramètres, définissez les paramètres de la procédure stockée. La valeur du paramètre n'est utilisée que lors des exécutions en session du code PySpark, mais la déclaration elle-même est stockée dans la procédure.

    5. Dans la section Options avancées, spécifiez les options de procédure. Pour obtenir une liste détaillée des options de procédure, consultez la liste des options de procédure.

    6. Dans la section Propriétés, ajoutez les paires clé/valeur pour configurer le job. Vous pouvez utiliser n'importe quelle paire clé/valeur issue des propriétés Spark de Dataproc sans serveur.

    7. Dans Paramètres du compte de service, spécifiez le compte de service personnalisé, la clé CMEK, l'ensemble de données de préproduction et le dossier Cloud Storage de préproduction à utiliser lors des exécutions en session du code PySpark.

    8. Cliquez sur Enregistrer.

Enregistrer une procédure stockée pour Spark

Après avoir créé la procédure stockée à l'aide de l'éditeur PySpark, vous pouvez l'enregistrer. Pour cela, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, créez une procédure stockée pour Spark en utilisant Python avec l'éditeur PySpark.

  3. Cliquez sur Enregistrer > Enregistrer la procédure.

  4. Dans la boîte de dialogue Enregistrer la procédure stockée, spécifiez le nom de l'ensemble de données dans lequel vous souhaitez stocker la procédure et le nom à donner à la procédure stockée.

  5. Cliquez sur Enregistrer.

    Si vous souhaitez seulement exécuter le code PySpark au lieu de l'enregistrer en tant que procédure stockée, vous pouvez cliquer sur Exécuter au lieu de Enregistrer.

Utiliser des conteneurs personnalisés

Le conteneur personnalisé fournit l'environnement d'exécution pour les processus de pilote et d'exécuteur de la charge de travail. Pour utiliser des conteneurs personnalisés, utilisez l'exemple de code suivant :

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

Remplacez les éléments suivants :

  • PROJECT_ID : projet dans lequel vous souhaitez créer la procédure stockée, par exemple myproject.
  • DATASET : ensemble de données dans lequel vous souhaitez créer la procédure stockée, par exemple mydataset.
  • PROCEDURE_NAME : nom de la procédure stockée que vous souhaitez exécuter dans BigQuery, par exemple, mysparkprocedure.
  • PROCEDURE_ARGUMENT : paramètre permettant de saisir les arguments d'entrée.

    Dans ce paramètre, spécifiez les champs suivants :

    • ARGUMENT_MODE : mode de l'argument.

      Les valeurs autorisées incluent IN, OUT et INOUT. La valeur par défaut est IN.

    • ARGUMENT_NAME : nom de l'argument.
    • ARGUMENT_TYPE : type de l'argument.

    Exemple : myproject.mydataset.mysparkproc(num INT64).

    Pour plus d'informations, référez-vous aux sections du présent document qui expliquent comment transmettre une valeur en tant que paramètre IN ou en tant que paramètres OUT et INOUT.

  • CONNECTION_PROJECT_ID : projet contenant la connexion permettant d'exécuter la procédure Spark.
  • CONNECTION_REGION : région contenant la connexion permettant d'exécuter la procédure Spark (par exemple, us).
  • CONNECTION_ID : ID de connexion, par exemple myconnection.

    Lorsque vous affichez les détails de la connexion dans la console Google Cloud, il s'agit de la valeur de la dernière section de l'ID de connexion complet affiché dans ID de connexion (par exemple, projects/myproject/locations/connection_location/connections/myconnection).

  • RUNTIME_VERSION : version d'exécution de Spark (par exemple, 1.1)
  • MAIN_PYTHON_FILE_URI : chemin d'accès à un deuxième fichier PySpark, par exemple gs://mybucket/mypysparkmain.py.

    Si vous souhaitez ajouter le corps de la procédure stockée dans l'instruction CREATE PROCEDURE, ajoutez le PYSPARK_CODE après LANGUAGE PYTHON AS comme indiqué dans l'exemple dans la section Utiliser du code intégré dans ce document.

  • PYSPARK_CODE : définition d'une application PySpark dans l'instruction CREATE PROCEDURE si vous souhaitez transmettre le corps de la procédure de manière intégrée.

    La valeur est un littéral de chaîne. Si le code inclut des guillemets et des barres obliques inverses, ceux-ci doivent être échappés ou représentés sous forme de chaîne brute. Par exemple, le code "\n"; peut être représenté comme suit :

    • Chaîne entre guillemets : "return \"\\n\";". Les guillemets et les barres obliques inverses sont échappés.
    • Chaîne entre guillemets triples : """return "\\n";""". Les barres obliques inverses sont échappées, mais pas les guillemets.
    • Chaîne brute : r"""return "\n";""". Aucun échappement n'est nécessaire.
    Pour savoir comment ajouter du code PySpark intégré, consultez la section Utiliser le code intégré.
  • CONTAINER_IMAGE : chemin d'accès à l'image dans Artifact Registry. Il ne doit contenir que des bibliothèques à utiliser dans votre procédure. Si elle n'est pas spécifiée, l'image de conteneur par défaut du système associée à la version d'exécution est utilisée.

Pour en savoir plus sur la création d'une image de conteneur personnalisée avec Spark, consultez la section Créer une image de conteneur personnalisée.

Appeler une procédure stockée pour Spark

Après avoir créé une procédure stockée, vous pouvez l'appeler à l'aide de l'une des options suivantes :

Console

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans le volet Explorateur, développez votre projet et sélectionnez la procédure stockée pour Spark que vous souhaitez exécuter.

  3. Dans la fenêtre Informations de procédure stockées, cliquez sur Appeler la procédure stockée. Vous pouvez également développer l'option Afficher les actions, puis cliquer sur Appeler.

  4. Cliquez sur Exécuter.

  5. Dans la section Tous les résultats, cliquez sur Afficher les résultats.

  6. Facultatif : dans la section Résultats de la requête, procédez comme suit :

    • Si vous souhaitez afficher les journaux du pilote Spark, cliquez sur Détails d'exécution.

    • Si vous souhaitez afficher les journaux dans Cloud Logging, cliquez sur Informations sur la tâche, puis, dans le champ Journal, cliquez sur journal

    • Si vous souhaitez obtenir le point de terminaison du serveur d'historique Spark, cliquez sur Informations sur le job, puis sur Serveur d'historique Spark.

SQL

Pour appeler une procédure stockée, utilisez l'instruction CALL PROCEDURE :

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, saisissez l'instruction suivante :

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()

  3. Cliquez sur Exécuter.

Pour en savoir plus sur l'exécution des requêtes, consultez Exécuter une requête interactive.

Utiliser un compte de service personnalisé

Au lieu d'utiliser l'identité du service de la connexion Spark pour l'accès aux données, vous pouvez utiliser un compte de service personnalisé pour accéder aux données de votre code Spark.

Pour utiliser un compte de service personnalisé, spécifiez le mode de sécurité INVOKER (à l'aide de l'instruction EXTERNAL SECURITY INVOKER) lorsque vous créez une procédure stockée Spark et spécifiez le compte de service lorsque vous appelez la procédure stockée.

Si vous souhaitez accéder au code Spark et l'utiliser à partir de Cloud Storage, vous devez accorder les autorisations nécessaires à l'identification du service de la connexion Spark. Vous devez accorder au compte de service de la connexion l'autorisation IAM storage.objects.get ou le rôle IAM storage.objectViewer.

Vous pouvez également accorder au compte de service de la connexion l'accès à Dataproc Metastore et au serveur d'historique persistant Dataproc si vous les avez spécifiés dans la connexion. Pour en savoir plus, consultez la section Accorder l'accès au compte de service.

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Vous pouvez également ajouter les arguments suivants au code précédent :

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

Remplacez les éléments suivants :

  • CUSTOM_SERVICE_ACCOUNT : valeur obligatoire. Compte de service personnalisé que vous avez fourni.
  • BUCKET_NAME : facultatif. Bucket Cloud Storage utilisé comme système de fichiers par défaut de l'application Spark. Si cet argument n'est pas fourni, un bucket Cloud Storage par défaut est créé dans votre projet et il est partagé par tous les jobs exécutés dans le même projet.
  • DATASET : facultatif. Ensemble de données permettant de stocker les données temporaires générées par l'appel de la procédure. Les données sont nettoyées une fois le job terminé. S'il n'est pas fourni, un ensemble de données temporaire par défaut est créé pour le job.

Votre compte de service personnalisé doit disposer des autorisations suivantes :

  • Pour lire et écrire dans le bucket de préproduction utilisé comme système de fichiers par défaut de l'application Spark, procédez comme suit :

    • Autorisations storage.objects.* ou rôle IAM roles/storage.objectAdmin sur le bucket de préproduction que vous spécifiez.
    • En outre, les autorisations storage.buckets.* ou le rôle IAM roles/storage.Admin sur le projet, si le bucket de préproduction n'est pas spécifié.
  • (Facultatif) Pour lire et écrire des données depuis et vers BigQuery, procédez comme suit :

    • bigquery.tables.* sur vos tables BigQuery.
    • bigquery.readsessions.* sur votre projet
    • Le rôle IAM roles/bigquery.admin inclut les autorisations précédentes.
  • (Facultatif) Pour lire et écrire des données depuis et vers Cloud Storage :

    • Autorisations storage.objects.* ou rôle IAM roles/storage.objectAdmin sur vos objets Cloud Storage
  • (Facultatif) Pour lire et écrire dans l'ensemble de données de préproduction utilisé pour les paramètres INOUT/OUT, procédez comme suit :

    • Rôle IAM bigquery.tables.* ou roles/bigquery.dataEditor sur l'ensemble de données de préproduction que vous spécifiez.
    • En outre, l'autorisation bigquery.datasets.create ou le rôle IAM roles/bigquery.dataEditor sur le projet, si l'ensemble de données de préproduction n'est pas spécifié.

Exemples de procédures stockées pour Spark

Cette section présente des exemples de création d'une procédure stockée pour Apache Spark.

Utiliser un fichier PySpark ou JAR dans Cloud Storage

L'exemple suivant montre comment créer une procédure stockée pour Spark en utilisant la connexion my-project-id.us.my-connection et un fichier PySpark ou JAR stocké dans un bucket Cloud Storage :

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java ou Scala

Utilisez main_file_uri pour créer une procédure stockée :

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

Utilisez main_class pour créer une procédure stockée :

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

Utiliser du code intégré

L'exemple suivant montre comment créer une procédure stockée pour Spark en utilisant une connexion my-project-id.us.my-connection et du code PySpark intégré :

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

Transmettre une valeur en tant que paramètre d'entrée

Les exemples suivants illustrent les deux méthodes permettant de transmettre une valeur en tant que paramètre d'entrée dans Python :

Méthode 1 : Utiliser des variables d'environnement

Dans le code PySpark, vous pouvez obtenir les paramètres d'entrée de la procédure stockée pour Spark en utilisant des variables d'environnement dans le pilote et les exécuteurs Spark. Le nom de la variable d'environnement est au format BIGQUERY_PROC_PARAM.PARAMETER_NAME, où PARAMETER_NAME est le nom du paramètre d'entrée. Par exemple, si le nom du paramètre d'entrée est var, le nom de la variable d'environnement correspondante est BIGQUERY_PROC_PARAM.var. Les paramètres d'entrée sont encodés au format JSON. Dans votre code PySpark, vous pouvez obtenir la valeur du paramètre d'entrée dans une chaîne JSON à partir de la variable d'environnement, puis la décoder pour obtenir une variable Python.

L'exemple suivant montre comment récupérer la valeur d'un paramètre d'entrée de type INT64 dans votre code PySpark :

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

Méthode 2 : Utiliser une bibliothèque intégrée

Dans le code PySpark, vous pouvez simplement importer une bibliothèque intégrée et l'utiliser pour renseigner tous les types de paramètres. Pour transmettre les paramètres aux exécuteurs, renseignez-les dans un pilote Spark en tant que variables Python, puis transmettez les valeurs aux exécuteurs. La bibliothèque intégrée est compatible avec la plupart des types de données BigQuery, à l'exception de INTERVAL, GEOGRAPHY, NUMERIC et BIGNUMERIC.

Type de données BigQuery Type de données Python
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC Non compatible
BIGNUMERIC Non compatible
INTERVAL Non compatible
GEOGRAPHY Non compatible

L'exemple suivant montre comment importer la bibliothèque intégrée et l'utiliser pour renseigner un paramètre d'entrée de type INT64 et un paramètre d'entrée de type ARRAY<STRUCT<a INT64, b STRING>> dans votre code PySpark :

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Dans le code Java ou Scala, vous pouvez récupérer les paramètres d'entrée de la procédure stockée pour Spark via des variables d'environnement dans le pilote et les exécuteurs Spark. Le nom de la variable d'environnement est au format BIGQUERY_PROC_PARAM.PARAMETER_NAME, où PARAMETER_NAME est le nom du paramètre d'entrée. Par exemple, si le nom du paramètre d'entrée est var, le nom de la variable d'environnement correspondante est BIGQUERY_PROC_PARAM.var. Dans votre code Java ou Scala, vous pouvez récupérer la valeur du paramètre d'entrée à partir de la variable d'environnement.

L'exemple suivant montre comment récupérer la valeur d'un paramètre d'entrée à partir de variables d'environnement dans votre code Scala :

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

L'exemple suivant montre comment récupérer des paramètres d'entrée à partir de variables d'environnement dans votre code Java :

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

Transmettre des valeurs en tant que paramètres OUT et INOUT

Le paramètre de sortie renvoie la valeur de la procédure Spark, tandis que le paramètre INOUT accepte une valeur pour la procédure puis renvoie une valeur provenant de la procédure. Pour utiliser les paramètres OUT et INOUT, ajoutez le mot clé OUT ou INOUT avant le nom du paramètre lors de la création de la procédure Spark. Dans le code PySpark, vous utilisez la bibliothèque intégrée pour renvoyer une valeur en tant que paramètre OUT ou INOUT. Comme pour les paramètres d'entrée, la bibliothèque intégrée accepte la plupart des types de données BigQuery, à l'exception de INTERVAL, GEOGRAPHY, NUMERIC et BIGNUMERIC. Les valeurs de type TIME et DATETIME sont converties au fuseau horaire UTC lorsqu'elles sont retournées par l'intermédiaire des paramètres OUT ou INOUT.

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Lire à partir d'une table de métastore Hive et écrire les résultats dans BigQuery

L'exemple suivant montre comment transformer une table de métastore Hive et écrire les résultats dans BigQuery :

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

Afficher les filtres de journaux

Après avoir appelé une procédure stockée pour Spark, vous pouvez afficher les informations des journaux. Pour obtenir les informations de filtrage Cloud Logging et le point de terminaison du cluster d'historique Spark, utilisez la commande bq show. Les informations de filtrage sont disponibles dans le champ SparkStatistics du job enfant. Pour obtenir les filtres de journaux, procédez comme suit :

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requêtes, répertoriez les tâches enfants de la tâche de script de la procédure stockée :

    bq ls -j --parent_job_id=$parent_job_id

    Pour savoir comment obtenir l'ID du job, consultez la page Afficher les détails d'un job.

    Le résultat ressemble à ce qui suit :

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
  3. Identifiez le jobId de votre procédure stockée et utilisez la commande bq show pour afficher les détails de la tâche :

    bq show --format=prettyjson --job $child_job_id

    Copiez le champ sparkStatistics, car vous en aurez besoin dans une autre étape.

    Le résultat ressemble à ce qui suit :

    {
    "configuration": {...}"statistics": {
       "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
          }
    }
    }

  4. Pour Logging, générez des filtres de journaux avec les champs SparkStatistics :

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation

    Les journaux sont écrits dans la ressource surveillée bigquery.googleapis.com/SparkJob. Les journaux sont libellés par les composants INFO, DRIVER et EXECUTOR. Pour filtrer les journaux du pilote Spark, ajoutez le composant labels.component = "DRIVER" aux filtres de journaux. Pour filtrer les journaux de l'exécuteur Spark, ajoutez le composant labels.component = "EXECUTOR" aux filtres de journaux.

Utiliser la clé de chiffrement gérée par le client

La procédure BigQuery Spark utilise la clé de chiffrement gérée par le client (CMEK) pour protéger votre contenu, ainsi que le chiffrement par défaut fourni par BigQuery. Pour utiliser la clé CMEK dans la procédure Spark, commencez par déclencher la créaation du compte de service de chiffrement BigQuery et accordez les autorisations requises. La procédure Spark est également compatible avec les règles d'administration CMEK si elles sont appliquées à votre projet.

Si votre procédure stockée utilise le mode de sécurité INVOKER, votre clé CMEK doit être spécifiée via la variable système SQL lors de l'appel de la procédure. Sinon, votre clé CMEK peut être spécifiée via la connexion associée à la procédure stockée.

Pour spécifier la clé CMEK via la connexion lorsque vous créez une procédure stockée Spark, utilisez l'exemple de code suivant :

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

Pour spécifier la clé CMEK via la variable système SQL lors de l'appel de la procédure, utilisez l'exemple de code suivant :

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

Utiliser VPC Service Controls

VPC Service Controls vous permet de configurer un périmètre sécurisé pour vous protéger contre l'exfiltration de données. Pour utiliser VPC Service Controls avec une procédure Spark afin de renforcer la sécurité, commencez par créer un périmètre de service.

Pour protéger entièrement vos jobs de procédure Spark, ajoutez les API suivantes au périmètre de service :

  • API BigQuery (bigquery.googleapis.com)
  • API Cloud Logging (logging.googleapis.com)
  • API Cloud Storage (storage.googleapis.com), si vous utilisez Cloud Storage
  • API Artifact Registry (artifactregistry.googleapis.com) ou Container Registry (containerregistry.googleapis.com), si vous utilisez un conteneur personnalisé
  • API Dataproc Metastore (metastore.googleapis.com) et API Cloud Run Admin (run.googleapis.com), si vous utilisez Dataproc Metastore

Ajoutez le projet de requête de la procédure Spark au périmètre. Ajouter d'autres projets qui hébergent votre code ou vos données Spark dans le périmètre.

Bonnes pratiques

  • La première utilisation d'une connexion dans votre projet prend environ une minute. Pour gagner du temps, vous pouvez réutiliser une connexion Spark existante lorsque vous créez une procédure stockée pour Spark.

  • Lorsque vous créez une procédure Spark pour une utilisation en production, Google vous recommande de spécifier une version d'exécution. Pour obtenir la liste des versions d'exécution compatibles, consultez la page Versions d'exécution Dataproc sans serveur. Nous vous recommandons d'utiliser la version LTS (Long-Time-Support).

  • Lorsque vous spécifiez un conteneur personnalisé dans une procédure Spark, nous vous recommandons d'utiliser Artifact Registry et le streaming d'images.

  • Pour améliorer les performances, vous pouvez spécifier les propriétés d'allocation des ressources dans la procédure Spark. Les procédures stockées Spark sont compatibles avec une liste de propriétés d'allocation de ressources identique à celle de Dataproc sans serveur.

Limites

  • Vous ne pouvez utiliser que le protocole du point de terminaison gRPC pour vous connecter au métastore Dataproc. Les autres types de métastore Hive ne sont pas encore acceptés.
  • Les clés de chiffrement gérées par le client (CMEK) ne sont disponibles que lorsque les clients créent des procédures Spark à région unique. Les clés CMEK des régions mondiales et les clés CMEK multirégionales, par exemple EU ou US, ne sont pas acceptées.
  • La transmission de paramètres de sortie n'est possible que pour PySpark.
  • Si l'ensemble de données associé à la procédure stockée pour Spark est répliqué dans une région de destination via la réplication interrégionale d'ensembles de données, la procédure stockée ne peut être interrogée que dans la région dans laquelle elle a été créée.

Quotas et limites

Pour en savoir plus sur les quotas et les limites, consultez la page Quotas et limites applicables aux procédures stockées pour Spark.

Étapes suivantes