Accélérer les charges de travail par lot et les sessions interactives avec l'exécution des requêtes natives

Ce document explique quand et comment activer l'exécution de requêtes natives pour accélérer les charges de travail par lot et les sessions interactives Serverless pour Apache Spark.

Exigences relatives à l'exécution de requêtes natives

L'exécution de requêtes natives Serverless pour Apache Spark n'est disponible qu'avec les charges de travail par lot et les sessions interactives utilisant 1.2.26+, 2.2.26+ ou une version d'exécution Spark ultérieure s'exécutant dans le niveau de tarification Premium de Serverless pour Apache Spark. Les tarifs du niveau Premium sont plus élevés que ceux du niveau Standard, mais l'exécution de requêtes natives n'entraîne aucun frais supplémentaire. Pour en savoir plus sur les tarifs, consultez la page Tarifs de Serverless pour Apache Spark.

Propriétés d'exécution des requêtes natives

Cette section répertorie les propriétés d'allocation de ressources Spark requises et facultatives que vous pouvez utiliser pour activer et personnaliser l'exécution de requêtes natives pour votre charge de travail par lot ou votre session interactive.

Paramètres de propriété obligatoires

  • spark.dataproc.runtimeEngine=native : le moteur d'exécution de la charge de travail doit être défini sur native pour remplacer le moteur d'exécution spark par défaut.

  • spark.dataproc.spark.driver.compute.tier=premium et spark.dataproc.executor.compute.tier=premium : ces propriétés de niveau tarifaire doivent être définies sur le niveau tarifaire premium.

Propriétés d'allocation des ressources facultatives

  • spark.dataproc.driver.disk.tier, spark.dataproc.driver.disk.size, spark.dataproc.executor.disk.tier et spark.dataproc.executor.disk.size : utilisez ces propriétés pour définir et configurer le niveau et la taille du disque Premium pour les processus de pilote et d'exécuteur Spark.

    Les niveaux de disque Premium utilisent un brassage basé sur les colonnes plutôt que sur les lignes pour offrir de meilleures performances. Pour améliorer le débit d'E/S de shuffle, utilisez les niveaux de disque premium du pilote et de l'exécuteur avec une taille de disque suffisamment grande pour accueillir les fichiers de shuffle.

  • spark.driver.memory, spark.driver.memoryOverhead, spark.executor.memory, spark.executor.memoryOverhead et spark.memory.offHeap.size : utilisez ces propriétés pour ajuster la mémoire fournie aux processus du pilote et de l'exécuteur Spark.

    Vous pouvez configurer la mémoire de l'une des manières suivantes :

    • Option 1 : Configurez uniquement la mémoire hors tas (spark.memory.offHeap.size) avec une valeur spécifiée. L'exécution de requêtes natives utilisera la valeur spécifiée comme mémoire hors tas et allouera 1/7th supplémentaire de la valeur de mémoire hors tas comme mémoire dans le tas (spark.executor.memory).

    • Option 2 : Configurez à la fois la mémoire sur le tas (spark.executor.memory) et la mémoire hors tas (spark.memory.offHeap.size). La quantité que vous allouez à la mémoire hors tas doit être supérieure à celle que vous allouez à la mémoire sur le tas.

    Si vous ne configurez pas à la fois la mémoire hors tas (spark.memory.offHeap.size) et la mémoire dans le tas (spark.executor.memory), le moteur d'exécution des requêtes natives divise une quantité de mémoire par défaut de 4g selon un ratio de 6:1 entre la mémoire hors tas et la mémoire dans le tas.

    Recommandation : allouez la mémoire hors tas à la mémoire dans le tas dans un rapport 6:1.

    Exemples :

    Paramètres de mémoire sans exécution de requêtes natives Paramètres de mémoire recommandés avec l'exécution de requêtes natives
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6g 1g
    14g 12g 2g
    28g 24g 4G
    56g 48 g 8g

Exécuter l'outil de qualification

Pour identifier les charges de travail par lot qui peuvent atteindre des durées d'exécution plus rapides avec l'exécution de requêtes natives (NQE), vous pouvez utiliser l'outil de qualification. L'outil analyse les journaux d'événements Spark pour estimer les économies de temps d'exécution potentielles et identifier les opérations qui ne sont pas compatibles avec le moteur NQE.

Google Cloud propose deux méthodes pour exécuter l'analyse de qualification : le job de qualification et le script de qualification. L'approche recommandée pour la plupart des utilisateurs est le job de qualification, qui automatise la découverte et l'analyse des charges de travail par lot. Le script de qualification alternatif est disponible pour le cas d'utilisation spécifique de l'analyse d'un fichier journal d'événements connu. Choisissez la méthode qui convient le mieux à votre cas d'utilisation :

  • Tâche de qualification (recommandée) : il s'agit de la méthode principale et recommandée. Il s'agit d'un job PySpark qui découvre et analyse automatiquement les charges de travail par lot récentes dans un ou plusieurs projets et régions Google Cloud . Utilisez cette méthode lorsque vous souhaitez effectuer une analyse générale sans avoir à localiser manuellement les fichiers journaux d'événements individuels. Cette approche est idéale pour évaluer à grande échelle l'adéquation de l'évaluation de la qualité des requêtes naturelles.

  • Script de qualification (autre méthode) : il s'agit d'une autre méthode pour les cas d'utilisation avancés ou spécifiques. Il s'agit d'un script shell qui analyse un seul fichier journal d'événements Spark ou tous les journaux d'événements d'un répertoire Cloud Storage spécifique. Utilisez cette méthode si vous disposez du chemin d'accès Cloud Storage aux journaux d'événements que vous souhaitez analyser.

Tâche de qualification

Le job de qualification simplifie l'analyse à grande échelle en recherchant de manière programmatique les charges de travail par lot Serverless pour Apache Spark et en envoyant un job d'analyse distribuée. L'outil évalue les tâches dans votre organisation, ce qui élimine la nécessité de trouver et de spécifier manuellement les chemins d'accès aux journaux d'événements.

Attribuer des rôles IAM

Pour que le job de qualification puisse accéder aux métadonnées de charge de travail par lot et lire les journaux d'événements Spark dans Cloud Logging, le compte de service qui exécute la charge de travail doit disposer des rôles IAM suivants dans tous les projets à analyser :

Envoyer le job de qualification

Vous envoyez la tâche de qualification à l'aide de l'outil gcloud CLI. Le job inclut un script PySpark et un fichier JAR hébergés dans un bucket Cloud Storage public.

Vous pouvez exécuter le job dans l'un des environnements d'exécution suivants :

  • En tant que charge de travail par lot Serverless pour Apache Spark. Il s'agit d'une exécution simple et autonome des tâches.

  • En tant que job exécuté sur un cluster Dataproc sur Compute Engine. Cette approche peut être utile pour intégrer le job dans un workflow.

Arguments de la tâche

Argument Description Obligatoire ? Valeur par défaut
--project-ids ID d'un seul projet ou liste d'ID de projets Google Cloud séparés par une virgule à analyser pour les charges de travail par lot. Non Projet dans lequel la tâche de qualification est exécutée.
--regions Une seule région ou une liste de régions séparées par une virgule à analyser dans les projets spécifiés. Non Toutes les régions des projets spécifiés.
--start-time Date de début pour filtrer les lots. Seuls les lots créés à partir de cette date (au format AAAA-MM-JJ) seront analysés. Non Aucun filtre de date de début n'est appliqué.
--end-time Date de fin pour filtrer les lots. Seuls les lots créés à cette date ou avant (format : AAAA-MM-JJ) seront analysés. Non Aucun filtre de date de fin n'est appliqué.
--limit Nombre maximal de lots à analyser par région. Les lots les plus récents sont analysés en premier. Non Tous les lots correspondant aux autres critères de filtrage sont analysés.
--output-gcs-path Chemin d'accès Cloud Storage (par exemple, gs://your-bucket/output/) où les fichiers de résultats seront écrits. Oui Aucun
--input-file Chemin d'accès Cloud Storage vers un fichier texte pour l'analyse groupée. Si cet argument est fourni, il remplace tous les autres arguments définissant le champ d'application (--project-ids, --regions, --start-time, --end-time, --limit). Non Aucun

Exemples de tâches de qualification

  • Un job par lot Serverless pour Apache Spark permettant d'effectuer une analyse simple et ponctuelle. Les arguments du job sont listés après le séparateur --.

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=COMMA_SEPARATED_PROJECT_IDS \
        --regions=COMMA_SEPARATED_REGIONS \
        --limit=MAX_BATCHES \
        --output-gcs-path=gs://BUCKET
    
  • Un job par lot Serverless pour Apache Spark permettant d'analyser jusqu'à 50 des lots les plus récents trouvés dans sample_project dans la région us-central1. Les résultats sont écrits dans un bucket Cloud Storage. Les arguments du job sont listés après le séparateur --.

    gcloud dataproc batches submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --project=PROJECT_ID \
        --region=US-CENTRAL1 \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --project-ids=PROJECT_ID \
        --regions=US-CENTRAL1 \
        --limit=50 \
        --output-gcs-path=gs://BUCKET/
    
  • Job Dataproc sur Compute Engine envoyé à un cluster Dataproc pour une analyse groupée dans un workflow d'analyse à grande échelle, répétable ou automatisé. Les arguments de la tâche sont placés dans un fichier INPUT_FILE qui est importé dans un BUCKET dans Cloud Storage. Cette méthode est idéale pour analyser différentes plages de dates ou limites de lots dans différents projets et régions en une seule exécution.

    gcloud dataproc jobs submit pyspark gs://qualification-tool/performance-boost-qualification.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://qualification-tool/dataproc-perfboost-qualification-1.2.jar \
        -- \
        --input-file=gs://INPUT_FILE \
        --output-gcs-path=gs://BUCKET
    

    Remarques :

    INPUT_FILE : chaque ligne du fichier représente une demande d'analyse distincte et utilise un format de drapeaux d'une seule lettre suivis de leurs valeurs, par exemple -p PROJECT-ID -r REGION -s START_DATE -e END_DATE -l LIMITS.

    Exemple de contenu du fichier d'entrée :

    -p project1 -r us-central1 -s 2024-12-01 -e 2024-12-15 -l 100
    -p project2 -r europe-west1 -s 2024-11-15 -l 50
    

    Ces arguments indiquent à l'outil d'analyser les deux portées suivantes :

    • Jusqu'à 100 lots dans le projet 1 de la région us-central1 créés entre le 1er et le 15 décembre 2025.
    • Jusqu'à 50 lots dans le projet 2 dans la région europe-west1 créés à partir du 15 novembre 2025.

Script de qualification

Utilisez cette méthode si vous disposez du chemin d'accès Cloud Storage direct vers un journal d'événements Spark spécifique que vous souhaitez analyser. Cette approche nécessite de télécharger et d'exécuter un script shell, run_qualification_tool.sh, sur une machine locale ou une VM Compute Engine configurée avec un accès au fichier journal des événements dans Cloud Storage.

Procédez comme suit pour exécuter le script sur les fichiers d'événements de charge de travail par lot Serverless pour Apache Spark.

1.Copiez run_qualification_tool.sh dans un répertoire local contenant les fichiers d'événements Spark à analyser.

  1. Exécutez le script de qualification pour analyser un fichier d'événements ou un ensemble de fichiers d'événements contenus dans le répertoire du script.

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    Options et valeurs :

    -f (obligatoire) : consultez Emplacements des fichiers d'événements Spark pour localiser les fichiers d'événements de charge de travail Spark.

    • EVENT_FILE_PATH (obligatoire, sauf si EVENT_FILE_NAME est spécifié) : chemin d'accès au fichier d'événements à analyser. Si aucun chemin n'est fourni, le chemin d'accès au fichier d'événements est supposé être le répertoire actuel.

    • EVENT_FILE_NAME (obligatoire, sauf si EVENT_FILE_PATH est spécifié) : Nom du fichier d'événements à analyser. Si aucune valeur n'est fournie, les fichiers d'événements trouvés de manière récursive dans EVENT_FILE_PATH sont analysés.

    -o(facultatif) : si aucune valeur n'est fournie, l'outil crée ou utilise un répertoire output existant sous le répertoire actuel pour placer les fichiers de sortie.

    • CUSTOM_OUTPUT_DIRECTORY_PATH : chemin d'accès au répertoire de sortie des fichiers de sortie.

    -k (facultatif) :

    -x (facultatif) :

    • MEMORY_ALLOCATED : mémoire en gigaoctets à allouer à l'outil. Par défaut, l'outil utilise 80 % de la mémoire libre disponible dans le système et tous les cœurs de machine disponibles.

    -t(facultatif) :

    • PARALLEL_THREADS_TO_RUN : nombre de threads parallèles pour l'exécution de l'outil. Par défaut, l'outil exécute tous les cœurs.

    Exemple d'utilisation de la commande :

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    Dans cet exemple, l'outil de qualification parcourt le répertoire gs://dataproc-temp-us-east1-9779/spark-job-history et analyse les fichiers d'événements Spark contenus dans ce répertoire et ses sous-répertoires. L'accès au répertoire est fourni par /keys/event-file-key. L'outil utilise 34 GB memory pour l'exécution et exécute 5 threads parallèles.

    Emplacements des fichiers d'événements Spark

Pour trouver les fichiers d'événements Spark pour les charges de travail par lot Serverless pour Apache Spark, procédez comme suit :

  1. Dans Cloud Storage, recherchez le fichier spark.eventLog.dir de la charge de travail, puis téléchargez-le.

    1. Si vous ne trouvez pas le spark.eventLog.dir, définissez le spark.eventLog.dir sur un emplacement Cloud Storage, puis réexécutez la charge de travail et téléchargez le spark.eventLog.dir.
  2. Si vous avez configuré le serveur d'historique Spark pour le job par lot :

    1. Accédez au serveur d'historique Spark, puis sélectionnez la charge de travail.
    2. Cliquez sur Télécharger dans la colonne Journal des événements.

Fichiers de sortie de l'outil de qualification

Une fois le script d'analyse ou le job de qualification terminé, l'outil de qualification place les fichiers de sortie suivants dans un répertoire perfboost-output du répertoire actuel :

  • AppsRecommendedForBoost.tsv : liste des applications recommandées pour l'exécution de requêtes natives, séparées par des tabulations.

  • UnsupportedOperators.tsv : liste des applications non recommandées pour l'exécution de requêtes natives, séparées par une tabulation.

Fichier de sortie AppsRecommendedForBoost.tsv

Le tableau suivant présente le contenu d'un exemple de fichier de sortie AppsRecommendedForBoost.tsv. Il contient une ligne pour chaque application analysée.

Exemple de fichier de sortie AppsRecommendedForBoost.tsv :

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0 % 0 % 548924253 548924253 100 % TRUE 30,00 %
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0 % 0 % 514401703 514401703 100 % TRUE 30,00 %

Descriptions des colonnes :

  • applicationId : ApplicationID de l'application Spark. Utilisez-le pour identifier la charge de travail par lot correspondante.

  • applicationName : nom de l'application Spark.

  • rddPercentage : pourcentage d'opérations RDD dans l'application. Les opérations RDD ne sont pas compatibles avec l'exécution de requêtes natives.

  • unsupportedSqlPercentage: Pourcentage d'opérations SQL non compatibles avec l'exécution de requêtes natives.

  • totalTaskTime : temps de tâche cumulé de toutes les tâches exécutées lors de l'exécution de l'application.

  • supportedTaskTime : durée totale de la tâche prise en charge par l'exécution de requêtes natives.

Les colonnes suivantes fournissent des informations importantes pour vous aider à déterminer si l'exécution de requêtes natives peut être bénéfique pour votre charge de travail par lot :

  • supportedSqlPercentage : pourcentage d'opérations SQL compatibles avec l'exécution de requêtes natives. Plus le pourcentage est élevé, plus la réduction du temps d'exécution peut être importante en exécutant l'application avec l'exécution de requêtes natives.

  • recommendedForBoost : si la valeur est TRUE, il est recommandé d'exécuter l'application avec l'exécution de requêtes natives. Si recommendedForBoost est défini sur FALSE, n'utilisez pas l'exécution de requêtes natives sur la charge de travail par lot.

  • expectedRuntimeReduction : pourcentage de réduction attendu du temps d'exécution de l'application lorsque vous l'exécutez avec l'exécution de requêtes natives.

Fichier de sortie UnsupportedOperators.tsv.

Le fichier de sortie UnsupportedOperators.tsv contient une liste des opérateurs utilisés dans les applications de charge de travail qui ne sont pas compatibles avec l'exécution de requêtes natives. Chaque ligne du fichier de sortie liste un opérateur non compatible.

Descriptions des colonnes :

  • unsupportedOperator : nom de l'opérateur non compatible avec l'exécution de requêtes natives.

  • cumulativeCpuMs : nombre de millisecondes de processeur consommées lors de l'exécution de l'opérateur. Cette valeur reflète l'importance relative de l'opérateur dans l'application.

  • count : nombre de fois où l'opérateur est utilisé dans l'application.

Utiliser l'exécution de requêtes natives

Vous pouvez utiliser l'exécution de requêtes natives avec votre application en définissant les propriétés d'exécution de requêtes natives lorsque vous créez la charge de travail par lot, la session interactive ou le modèle de session qui exécute votre application.

Utiliser l'exécution de requêtes natives avec des charges de travail par lot

Vous pouvez utiliser la console Google Cloud , Google Cloud CLI ou l'API Dataproc pour activer l'exécution de requêtes natives sur une charge de travail par lot.

Console

Utilisez la console Google Cloud pour activer l'exécution de requêtes natives sur une charge de travail par lot.

  1. Dans la console Google Cloud  :

    1. Accédez à la page "Lots" de Dataproc.
    2. Cliquez sur Créer pour ouvrir la page Créer un lot.
  2. Sélectionnez et renseignez les champs suivants pour configurer le lot pour l'exécution de requêtes natives :

    • Conteneur :
    • Configuration des niveaux pour les exécuteurs et les pilotes :
      • Sélectionnez Premium pour tous les niveaux (Niveau de calcul pour les pilotes, Niveau de calcul pour les exécuteurs).
    • Propriétés : saisissez des paires Key (nom de propriété) et Value pour spécifier les propriétés d'exécution des requêtes natives :
      Clé Valeur
      spark.dataproc.runtimeEngine native
  3. Renseignez, sélectionnez ou confirmez les autres paramètres des charges de travail par lot. Consultez Envoyer une charge de travail par lot Spark.

  4. Cliquez sur Envoyer pour exécuter la charge de travail par lot Spark.

gcloud

Définissez les indicateurs de commande gcloud CLI gcloud dataproc batches submit spark suivants pour configurer une charge de travail par lot pour l'exécution de requêtes natives :

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Remarques :

  • PROJECT_ID : ID de votre projet Google Cloud . Les ID de projet sont listés dans la section Informations sur le projet du tableau de bord de la console Google Cloud .
  • REGION : Région Compute Engine disponible pour exécuter la charge de travail.
  • OTHER_FLAGS_AS_NEEDED : consultez Envoyer une charge de travail par lot Spark.

API

Définissez les champs de l'API Dataproc suivants pour configurer une charge de travail par lot pour l'exécution de requêtes natives :

Quand utiliser l'exécution de requêtes natives ?

Utilisez l'exécution de requêtes natives dans les scénarios suivants :

  • API Spark DataFrame, API Spark Dataset et requêtes Spark SQL qui lisent des données à partir de fichiers Parquet et ORC. Le format du fichier de sortie n'a aucune incidence sur les performances d'exécution des requêtes natives.

  • Charges de travail recommandées par l'outil de qualification de l'exécution des requêtes natives.

Quand ne pas utiliser l'exécution de requêtes natives

Entrées des types de données suivants :

  • Octet : ORC et Parquet
  • Code temporel : ORC
  • Struct, Array, Map : Parquet

Limites

L'activation de l'exécution de requêtes natives dans les scénarios suivants peut entraîner des exceptions, des incompatibilités Spark ou le retour de la charge de travail au moteur Spark par défaut.

Créations de remplacement

L'exécution de requêtes natives peut entraîner un retour à l'exécution de charges de travail sur le moteur d'exécution Spark, ce qui peut entraîner une régression ou un échec.

  • ANSI : si le mode ANSI est activé, l'exécution revient à Spark.

  • Mode sensible à la casse : l'exécution de requêtes natives n'est compatible qu'avec le mode par défaut de Spark, qui n'est pas sensible à la casse. Si le mode sensible à la casse est activé, des résultats incorrects peuvent s'afficher.

  • Analyse de table partitionnée : l'exécution de requêtes natives n'est compatible avec l'analyse de table partitionnée que lorsque le chemin d'accès contient les informations de partition. Sinon, la charge de travail revient au moteur d'exécution Spark.

Comportement incompatible

Un comportement incompatible ou des résultats incorrects peuvent se produire lors de l'exécution de requêtes natives dans les cas suivants :

  • Fonctions JSON : l'exécution de requêtes natives accepte les chaînes entourées de guillemets doubles, et non de guillemets simples. Des résultats incorrects s'affichent avec les guillemets simples. L'utilisation de "*" dans le chemin d'accès avec la fonction get_json_object renvoie NULL.

  • Configuration de lecture Parquet :

    • L'exécution de requêtes natives traite spark.files.ignoreCorruptFiles comme étant défini sur la valeur par défaut false, même lorsqu'il est défini sur true.
    • L'exécution de requêtes natives ignore spark.sql.parquet.datetimeRebaseModeInRead et ne renvoie que le contenu du fichier Parquet. Les différences entre l'ancien calendrier hybride (julien-grégorien) et le calendrier grégorien proleptique ne sont pas prises en compte. Les résultats Spark peuvent varier.
  • NaN : non compatible. Des résultats inattendus peuvent se produire, par exemple, lorsque vous utilisez NaN dans une comparaison numérique.

  • Lecture en colonnes Spark : une erreur fatale peut se produire, car le vecteur en colonnes Spark est incompatible avec l'exécution de requêtes natives.

  • Débordement : lorsque les partitions de répartition aléatoire sont définies sur un grand nombre, la fonctionnalité de débordement sur disque peut déclencher un OutOfMemoryException. Si cela se produit, vous pouvez éliminer cette exception en réduisant le nombre de partitions.