Accélérez les charges de travail par lot et les sessions interactives de Dataproc sans serveur avec l'exécution de requêtes natives

Lancement de l'exécution de requêtes natives, qui est compatible avec les API Spark Dataframe, les requêtes Spark SQL qui lisent des données à partir de fichiers Parquet et ORC, ainsi que les charges de travail recommandées par l'outil de qualification de l'exécution de requêtes natives. Pour toute question concernant d'autres cas d'utilisation, veuillez contacter dataproc-pms@google.com.

Ce document explique comment activer les charges de travail par lot Dataproc sans serveur et les sessions interactives exécutées dans le niveau de tarification Premium pour utiliser l'exécution de requêtes native.

Utiliser l'exécution de requêtes natives avec la tarification de niveau Premium

L'exécution de requêtes natives Dataproc sans serveur n'est disponible que pour les charges de travail par lot et les sessions interactives exécutées dans le niveau de tarification premium de Dataproc sans serveur. Les tarifs du niveau Premium sont plus élevés que ceux du niveau Standard, mais aucuns frais supplémentaires ne sont facturés pour l'exécution de requêtes natives. Pour en savoir plus, consultez la page Tarifs de Dataproc sans serveur.

Vous pouvez activer l'allocation et la tarification de ressources de niveau premium pour les ressources de session interactive et par lot en définissant les propriétés de niveau d'allocation de ressources suivantes sur premium lorsque vous envoyez une charge de travail par lot Spark ou une session interactive.

Pour configurer l'exécution de requêtes natives, définissez des propriétés d'exécution de requêtes natives sur une charge de travail par lot, une session interactive ou un modèle de session, puis envoyez la charge de travail ou exécutez la session interactive dans un notebook.

Console

  1. Dans la console Google Cloud :

    1. Accédez à "Lots" de Dataproc.
    2. Cliquez sur Créer pour ouvrir la page Créer un lot.
  2. Sélectionnez et remplissez les champs suivants pour configurer le lot pour l'exécution de requêtes natives:

    • Conteneur:
    • Configuration des niveaux pour les exécuteurs et les pilotes:
      • Sélectionnez Premium pour tous les niveaux (Niveau de calcul pour les pilotes, Niveau de calcul pour l'exécution).
    • Propriétés:saisissez les paires Key (nom de la propriété) et Value suivantes pour les propriétés d'exécution des requêtes natives suivantes:
      Clé Valeur
      spark.dataproc.runtimeEngine native
  3. Renseignez, sélectionnez ou confirmez d'autres paramètres de charges de travail par lot. Consultez la section Envoyer une charge de travail par lot Spark.

  4. Cliquez sur ENVOYER pour exécuter la charge de travail par lot Spark.

gcloud

Définissez les indicateurs de commande gcloud dataproc batches submit spark de la gcloud CLI suivants pour configurer la charge de travail par lot pour l'exécution de requêtes natives:

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --version=VERSION \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.SparkPi \
    --properties=spark.dataproc.runtimeEngine=native,spark.dataproc.driver.compute.tier=premium,spark.dataproc.executor.compute.tier=premium \
    OTHER_FLAGS_AS_NEEDED

Remarques :

API

Définissez les champs d'API Dataproc suivants pour configurer la charge de travail par lot pour l'exécution de requêtes natives:

Ajuster votre charge de travail d'exécution de requêtes natives

L'exécution de requêtes natives sans serveur Dataproc peut être davantage ajustée à l'aide des propriétés suivantes:

Propriété de lot Cas d'utilisation
spark.driver.memory
spark.driver.memoryOverhead
To tune the memory provided to spark driver process
spark.executor.memory
spark.executor.memoryOverhead
spark.memory.offHeap.size
To tune the memory provided to onheap/offheap memory of executor process
spark.dataproc.driver.disk.tier
spark.dataproc.driver.disk.size
To configure premium disk tier and size for driver
spark.dataproc.executor.disk.tier
spark.dataproc.executor.disk.size
To configure premium disk tier and size for executor

Propriétés d'exécution de requêtes natives

  • spark.dataproc.runtimeEngine=native (obligatoire): le moteur d'exécution de la charge de travail doit être défini sur native pour remplacer le moteur d'exécution spark par défaut.

  • version (Obligatoire): la charge de travail doit utiliser la version d'exécution de Spark 1.2.26 ou ultérieure, 2.2.26 ou ultérieure, ou une version majeure ultérieure.

  • Niveaux de calcul Premium (obligatoire): les propriétés spark.dataproc.spark.driver.compute.tier et spark.dataproc.executor.compute.tier doivent être définies sur premium.

  • Niveaux de disque Premium (facultatif et recommandé): les niveaux de disque Premium utilisent un mélange basé sur les colonnes plutôt qu'un mélange basé sur les lignes pour améliorer les performances. Pour améliorer le débit d'E/S de mélange, utilisez les niveaux de disque premium du pilote et de l'exécuteur avec une taille de disque suffisamment importante pour accueillir les fichiers de mélange.

  • Mémoire (facultatif) : si vous avez configuré le moteur d'exécution des requêtes natives sans configurer à la fois la mémoire hors tas (spark.memory.offHeap.size) et la mémoire sur tas (spark.executor.memory), le moteur d'exécution des requêtes natives prend une quantité de mémoire 4g par défaut et la divise en fonction d'un ratio 6:1 entre la mémoire hors tas et la mémoire sur tas.

    Si vous décidez de configurer la mémoire lorsque vous utilisez l'exécution de requêtes natives, vous pouvez le faire de deux manières:

    • Configurez uniquement la mémoire hors tas (spark.memory.offHeap.size) avec une valeur spécifiée. L'exécution de requêtes natives utilise la valeur spécifiée comme mémoire hors tas et alloue 1/7th supplémentaire de la valeur de mémoire hors tas en tant que mémoire sur tas.

    • Configurez à la fois la mémoire sur tas (spark.executor.memory) et la mémoire hors tas (spark.memory.offHeap.size). La quantité que vous allouez à la mémoire hors tas doit être supérieure à celle que vous allouez à la mémoire sur tas. Recommandation: Allouez de la mémoire hors tas à la mémoire dans le tas dans un ratio de 6:1.

    Exemples de valeurs :

    Paramètres de mémoire sans exécution de requêtes natives Paramètres de mémoire recommandés avec l'exécution de requêtes natives
    spark.executor.memory spark.memory.offHeap.size spark.executor.memory
    7g 6 g 1g
    14 g 12 g 2g
    28 g 24 g 4G
    56 g 48 g 8 g

Outil de qualification de l'exécution de requêtes natives

Vous pouvez exécuter l'outil de qualification de l'exécution de requêtes natives Dataproc, run_qualification_tool.sh, pour identifier les charges de travail qui peuvent accélérer les temps d'exécution avec l'exécution de requêtes natives. L'outil analyse les fichiers d'événements Spark générés par les applications de charge de travail par lot, puis estime les économies d'exécution potentielles que chaque application de charge de travail peut obtenir avec l'exécution de requêtes natives.

Exécuter l'outil de qualification

Pour exécuter l'outil sur les fichiers d'événements de charge de travail par lot Dataproc sans serveur, procédez comme suit :

1.Copiez run_qualification_tool.sh dans un répertoire local contenant les fichiers d'événement Spark à analyser.

  1. Exécutez l'outil de qualification pour analyser un fichier d'événement ou un ensemble de fichiers d'événements contenus dans le répertoire de script.

    ./run_qualification_tool.sh -f EVENT_FILE_PATH/EVENT_FILE_NAME \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -k SERVICE_ACCOUNT_KEY  \
        -x MEMORY_ALLOCATEDg  \
        -t PARALLEL_THREADS_TO_RUN
    

    Options et valeurs:

    -f (obligatoire): consultez la section Emplacements des fichiers d'événements Spark pour trouver les fichiers d'événements de la charge de travail Spark.

    • EVENT_FILE_PATH (obligatoire, sauf si EVENT_FILE_NAME est spécifié) : chemin d'accès au fichier d'événement à analyser. Si elle n'est pas fournie, le chemin d'accès au fichier d'événement est supposé être le répertoire actuel.

    • EVENT_FILE_NAME (obligatoire, sauf si EVENT_FILE_PATH est spécifié) : nom du fichier d'événement à analyser. Si elle n'est pas fournie, les fichiers d'événement trouvés de manière récursive dans EVENT_FILE_PATH sont analysés.

    -o(facultatif): si cette option n'est pas fournie, l'outil crée ou utilise un répertoire output existant dans le répertoire actuel pour placer les fichiers de sortie.

    • CUSTOM_OUTPUT_DIRECTORY_PATH: chemin d'accès au répertoire de sortie des fichiers de sortie.

    -k (facultatif):

    -x (facultatif):

    • MEMORY_ALLOCATED: mémoire en gigaoctets à allouer à l'outil. Par défaut, l'outil utilise 80% de la mémoire libre disponible dans le système et tous les cœurs de machine disponibles.

    -t(facultatif):

    • PARALLEL_THREADS_TO_RUN: N=nombre de threads parallèles que l'outil doit exécuter. Par défaut, l'outil exécute tous les cœurs.

    Exemple d'utilisation de la commande:

    ./run_qualification_tool.sh -f gs://dataproc-temp-us-east1-9779/spark-job-history \
        -o perfboost-output -k /keys/event-file-key -x 34g -t 5
    

    Dans cet exemple, l'outil de qualification parcourt le répertoire gs://dataproc-temp-us-east1-9779/spark-job-history et analyse les fichiers d'événements Spark contenus dans ce répertoire et ses sous-répertoires. L'accès au répertoire est fourni par /keys/event-file-key. L'outil utilise 34 GB memory pour l'exécution et exécute des threads parallèles 5.

Fichiers de sortie de l'outil de qualification

Une fois l'analyse terminée, l'outil de qualification place les fichiers de sortie suivants dans un répertoire perfboost-output dans le répertoire actuel:

  • AppsRecommendedForBoost.tsv : liste des applications recommandées pour l'exécution des requêtes natives, séparées par des tabulations.

  • UnsupportedOperators.tsv : liste des applications non recommandées pour l'exécution de requêtes natives, séparées par des tabulations.

Fichier de sortie AppsRecommendedForBoost.tsv

Le tableau suivant présente le contenu d'un exemple de fichier de sortie AppsRecommendedForBoost.tsv. Il contient une ligne pour chaque application analysée.

Exemple de fichier de sortie AppsRecommendedForBoost.tsv:

applicationId applicationName rddPercentage unsupportedSqlPercentage totalTaskTime supportedTaskTime supportedSqlPercentage recommendedForBoost expectedRuntimeReduction
app-2024081/batches/083f6196248043938-000 projects/example.com:dev/locations/us-central1
6b4d6cae140f883c0
11c8e
0 % 0 % 548924253 548924253 100 % TRUE 30,00%
app-2024081/batches/60381cab738021457-000 projects/example.com:dev/locations/us-central1
474113a1462b426bf
b3aeb
0 % 0 % 514401703 514401703 100 % TRUE 30,00%

Descriptions des colonnes:

  • applicationId: ApplicationID de l'application Spark. Utilisez-le pour identifier la charge de travail par lot correspondante.

  • applicationName: nom de l'application Spark.

  • rddPercentage: pourcentage d'opérations RDD dans l'application. Les opérations RDD ne sont pas compatibles avec l'exécution de requêtes natives.

  • unsupportedSqlPercentage: Pourcentage d'opérations SQL non compatibles avec l'exécution de requêtes natives.

  • totalTaskTime: temps cumulé de toutes les tâches exécutées pendant l'exécution de l'application.

  • supportedTaskTime: durée totale de la tâche prise en charge par l'exécution de requêtes natives.

Les colonnes suivantes fournissent des informations importantes pour vous aider à déterminer si l'exécution de requêtes natives peut bénéficier à votre charge de travail par lot:

  • supportedSqlPercentage:pourcentage d'opérations SQL compatibles avec l'exécution de requêtes natives. Plus le pourcentage est élevé, plus l'exécution de l'application avec l'exécution de requêtes natives peut réduire le temps d'exécution.

  • recommendedForBoost:si la valeur est TRUE, il est recommandé d'exécuter l'application avec l'exécution de requêtes native. Si recommendedForBoost est FALSE, n'utilisez pas l'exécution de requêtes natives pour la charge de travail par lot.

  • expectedRuntimeReduction:pourcentage de réduction attendu du temps d'exécution de l'application lorsque vous l'exécutez avec l'exécution de requêtes natives.

Fichier de sortie UnsupportedOperators.tsv.

Le fichier de sortie UnsupportedOperators.tsv contient une liste d'opérateurs utilisés dans les applications de charge de travail qui ne sont pas compatibles avec l'exécution de requêtes natives. Chaque ligne du fichier de sortie liste un opérateur non compatible.

Descriptions des colonnes:

  • unsupportedOperator: nom de l'opérateur non compatible avec l'exécution de requêtes natives.

  • cumulativeCpuMs: nombre de millisecondes de processeur consommées lors de l'exécution de l'opérateur. Cette valeur reflète l'importance relative de l'opérateur dans l'application.

  • count: nombre de fois où l'opérateur est utilisé dans l'application.

Exécuter l'outil de qualification sur plusieurs projets

Cette section fournit des instructions pour exécuter un script afin d'exécuter l'outil de qualification pour analyser les fichiers d'événements Spark de plusieurs projets.

Exigences et limites concernant les scripts:

  • Exécutez le script sur des machines Linux :
    • La version Java >=11 doit être installée en tant que version Java par défaut.
  • Étant donné que les journaux de Cloud Logging ont une TTL de 30 jours, les fichiers d'événements Spark des charges de travail par lot exécutées il y a plus de 30 jours ne peuvent pas être analysés.

Pour exécuter l'outil de qualification dans plusieurs projets, procédez comme suit :

  1. Téléchargez le script list-batches-and-run-qt.sh, puis copiez-le sur votre ordinateur local.

  2. Modifier les autorisations du script

    chmod +x list-batches-and-run-qt.sh
    
  3. Préparez une liste de fichiers d'entrée de projet à transmettre au script pour analyse. Créez le fichier texte en ajoutant une ligne au format suivant pour chaque projet et région avec des fichiers d'événements Spark de charge de travail par lot à analyser.

    -r REGION -s START_DATE -e END_DATE -p PROJECT_ID -l LIMIT_MAX_BATCHES -k KEY_PATH
    

    Remarques :

    -r (obligatoire):

    • REGION: région dans laquelle les lots du projet sont envoyés.

    -s (obligatoire): format: yyyy-mm-dd. Vous pouvez ajouter un segment de temps 00:00:00 facultatif.

    • START_DATE: seules les charges de travail par lot créées après la date de début sont analysées. Les lots sont analysés dans l'ordre décroissant de leur date de création. Les lots les plus récents sont analysés en premier.

    -e (facultatif): format: yyyy-mm-dd. Vous pouvez ajouter un segment de temps 00:00:00 facultatif.

    • END_DATE: si vous spécifiez cette valeur, seules les charges de travail par lot créées avant ou à la date de fin sont analysées. Si ce paramètre n'est pas spécifié, tous les lots créés après START_DATE sont analysés. Les lots sont analysés par ordre décroissant de la date de création du lot. Les lots les plus récents sont analysés en premier.

    -l (facultatif):

    • LIMIT_MAX_BATCHES: nombre maximal de lots à analyser. Vous pouvez utiliser cette option en combinaison avec START-DATE et END-DATE pour analyser un nombre limité de lots créés entre les dates spécifiées.

      Si -l n'est pas spécifié, le nombre par défaut de lots pouvant atteindre 100 est analysé.

    -k (facultatif):

    • KEY_PATH: chemin d'accès local contenant des clés d'accès Cloud Storage pour les fichiers d'événements Spark de la charge de travail.

    Exemple de fichier d'entrée:

    -r us-central1 -s 2024-08-21 -p project1 -k key1
    -r us-east1 -s 2024-08-21 -e 2024-08-23 -l 50 -p project2 -k key2
    

    Remarques :

    • Ligne 1:jusqu'à 100 fichiers d'événements Spark les plus récents (par défaut) dans project1 dans la région us-central1 avec une heure de création après 2024-08-21 00:00:00 AM seront analysés. key1 permet d'accéder aux fichiers dans Cloud Storage.

    • Ligne 2 : jusqu'à 50 fichiers d'événements Spark les plus récents dans project2 dans la région us-eastl1 avec une heure de création après 2024-08-21 00:00:00 AM et avant ou le 23/08/2024 23h59:59 seront analysés. key2 permet d'accéder aux fichiers d'événements dans Cloud Storage.

  4. Exécutez le script list-batches-and-run-qt.sh. Les analyses sont générées dans les fichiers AppsRecommendedForBoost.tsv et UnsupportedOperators.tsv.

    ./list-batches-and-run-qt.sh PROJECT_INPUT_FILE_LIST \
        -x MEMORY_ALLOCATED \
        -o CUSTOM_OUTPUT_DIRECTORY_PATH \
        -t PARALLEL_THREADS_TO_RUN
    

    Remarques :

Emplacements des fichiers d'événements Spark

Pour trouver les fichiers d'événements Spark des charges de travail par lot Dataproc sans serveur, procédez comme suit:

  1. Dans Cloud Storage, recherchez le spark.eventLog.dir de la charge de travail, puis téléchargez-le.

    1. Si vous ne trouvez pas le spark.eventLog.dir, définissez-le sur un emplacement Cloud Storage, puis exécutez à nouveau la charge de travail et téléchargez le spark.eventLog.dir.spark.eventLog.dir
  2. Si vous avez configuré le serveur d'historique Spark pour la job par lot:

    1. Accédez au serveur d'historique Spark, puis sélectionnez la charge de travail.
    2. Cliquez sur Télécharger dans la colonne Journal des événements.

Quand utiliser l'exécution de requêtes natives ?

Utilisez l'exécution de requêtes natives dans les scénarios suivants:

API DataFrame Spark et requêtes Spark SQL qui lisent des données à partir de fichiers Parquet et ORC
Charges de travail recommandées par l'outil de qualification de l'exécution de requêtes natives.

Cas pour lesquels l'exécution de requêtes natives n'est pas recommandée

N'utilisez pas l'exécution de requêtes natives dans les scénarios suivants, car cela peut ne pas réduire le temps d'exécution de la charge de travail et entraîner un échec ou une régression:

  • Charges de travail non recommandées par l'outil de qualification de l'exécution de requêtes natives.
  • Charges de travail RDD, UDF et ML Spark
  • Charges de travail d'écriture
  • Formats de fichiers autres que Parquet et ORC
  • Données d'entrée des types suivants:
    • Timestamp, TinyInt, Byte, Struct, Array, Map: ORC et Parquet
    • VarChar, Char: ORC
  • Requêtes contenant des expressions régulières
  • Charges de travail qui utilisent un bucket Paiements du demandeur
  • Paramètres de configuration Cloud Storage non par défaut L'exécution de requêtes natives utilise de nombreuses valeurs par défaut, même si elles sont remplacées.

Limites

Activer l'exécution de requêtes natives dans les scénarios suivants peut générer des exceptions, soulever des incompatibilités avec Spark ou entraîner le retour de la charge de travail au moteur Spark par défaut.

Créations de remplacement

L'exécution de requêtes natives dans l'exécution suivante peut entraîner le basculement de la charge de travail vers le moteur d'exécution Spark, ce qui peut entraîner une régression ou une défaillance.

  • ANSI:si le mode ANSI est activé, l'exécution revient à Spark.

  • Mode sensible à la casse:l'exécution de requêtes natives n'est compatible qu'avec le mode sensible à la casse par défaut de Spark. Si le mode sensible à la casse est activé, des résultats incorrects peuvent se produire.

  • Analyse de table partitionnée:l'exécution de requêtes native n'est compatible avec l'analyse de table partitionnée que lorsque le chemin d'accès contient les informations de partitionnement. Sinon, la charge de travail est redirigée vers le moteur d'exécution Spark.

Comportement incompatible

Un comportement incompatible ou des résultats incorrects peuvent se produire lors de l'exécution de requêtes natives dans les cas suivants:

  • Fonctions JSON:l'exécution de requêtes natives accepte les chaînes placées entre guillemets doubles, et non entre guillemets simples. Les résultats sont incorrects avec les guillemets simples. L'utilisation de "*" dans le chemin d'accès avec la fonction get_json_object renvoie NULL.

  • Configuration de la lecture Parquet:

    • L'exécution de requêtes natives considère spark.files.ignoreCorruptFiles comme défini sur la valeur false par défaut, même lorsqu'il est défini sur true.
    • L'exécution de requêtes natives ignore spark.sql.parquet.datetimeRebaseModeInRead et ne renvoie que le contenu du fichier Parquet. Les différences entre l'ancien calendrier hybride (grégorien julien) et le calendrier grégorien proleptique ne sont pas prises en compte. Les résultats Spark peuvent être différents.
  • NaN:non compatible. Des résultats inattendus peuvent se produire, par exemple, lorsque vous utilisez NaN dans une comparaison numérique.

  • Lecture matricielle Spark:une erreur fatale peut se produire, car le vecteur matriciel Spark est incompatible avec l'exécution de requêtes natives.

  • Déversement:lorsque le nombre de partitions de mélange est défini sur un grand nombre, la fonctionnalité de déversement sur disque peut déclencher un OutOfMemoryException. Dans ce cas, vous pouvez réduire le nombre de partitions pour éliminer cette exception.