Programmer des tâches Spark et Spark SQL personnalisées

Dataplex Universal Catalog permet de planifier l'exécution de code personnalisé, que ce soit pour une exécution unique, régulière ou à la demande. La fonctionnalité à la demande est en version Preview et n'est disponible que via l'API. Vous pouvez planifier des transformations de données client à l'aide de Spark (Java), PySpark (limité à la version 3.2 de Spark) ou Spark SQL. Dataplex Universal Catalog exécute le code à l'aide du traitement Spark sans serveur et d'un planificateur sans serveur intégré.

Terminologie

Tâche
 Une tâche Dataplex Universal Catalog représente le travail que vous souhaitez que Dataplex Universal Catalog effectue selon une planification. Il encapsule votre code, vos paramètres et le calendrier.
Job

Un job représente une seule exécution d'une tâche Dataplex Universal Catalog. Par exemple, si une tâche est planifiée pour s'exécuter quotidiennement, Dataplex Universal Catalog créera un job chaque jour.

Pour les jobs créés le 10 mai 2023 ou après, le champ Déclencheur indique le type de déclencheur d'exécution du job.

Voici les types de déclencheurs d'exécution des tâches :

  • RUN_REQUEST : indique que le job a été exécuté suite à l'appel de l'API RunTask.

  • TASK_CONFIG : indique que le job a été exécuté en raison de la configuration TriggerSpec de la tâche.

Modes de planification

Dataplex Universal Catalog est compatible avec les modes de planification suivants :

Exécuter une fois
 Utilisez ce mode pour exécuter votre tâche une seule fois. Vous pouvez choisir de l'exécuter immédiatement ou à une heure définie ultérieurement. Si vous exécutez la tâche immédiatement, l'exécution peut quand même prendre jusqu'à deux minutes à démarrer.
Exécuter selon un calendrier
 Utilisez ce mode pour exécuter la tâche à une fréquence répétée. Les répétitions acceptées sont quotidiennes, hebdomadaires, mensuelles ou personnalisées.
Exécuter à la demande

Utilisez ce mode pour exécuter une tâche créée précédemment à la demande. Le mode d'exécution à la demande n'est compatible qu'avec l'API RunTask. Lorsque votre job s'exécute à la demande, Dataplex Universal Catalog utilise les paramètres existants pour créer un job. Vous pouvez spécifier les arguments ExecutionSpec et les libellés pour exécuter le job.

Avant de commencer

  1. activer l'API Dataproc ;

    Activer l'API Dataproc

  2. Activez l'accès privé à Google pour votre réseau ou votre sous-réseau. Activez l'accès privé à Google sur le réseau que vous utilisez avec les tâches Dataplex Universal Catalog. Si vous ne spécifiez pas de réseau ni de sous-réseau lorsque vous créez la tâche Dataplex Universal Catalog, Dataplex Universal Catalog utilise le sous-réseau par défaut. Vous devez alors activer l'accès privé à Google pour ce sous-réseau.

  3. Créer un compte de service Un compte de service est requis pour planifier des tâches Dataplex Universal Catalog. Le compte de service doit appartenir au projet dans lequel vous exécutez les tâches. Le compte de service doit disposer des autorisations suivantes :

    • Accès aux données BigQuery et Cloud Storage en cours de traitement.

    • Autorisation Rôle Nœud de calcul Dataproc sur le projet dans lequel vous exécutez la tâche.

    • Si la tâche doit lire ou mettre à jour l'instance Dataproc Metastore associée au lac, le compte de service doit disposer du rôle Lecteur ou Éditeur Dataproc Metastore. Ce rôle doit être attribué dans le projet où le lac Dataplex Universal Catalog est configuré.

    • Si la tâche est un job SparkSQL, vous devez accorder au compte de service le rôle Développeur Dataplex Universal Catalog. Ce rôle doit être attribué dans le projet où le lac Dataplex Universal Catalog est configuré.

    • Si la tâche est un job SparkSQL, vous devez disposer des autorisations d'administrateur Cloud Storage sur le bucket dans lequel les résultats sont écrits.

    • Pour planifier et exécuter des tâches Spark SQL et Spark personnalisées, vous devez disposer des rôles IAM Lecteur de métadonnées Dataplex Universal Catalog (roles/dataplex.metadataReader), Lecteur Dataplex Universal Catalog (roles/dataplex.viewer) et Utilisateur de métadonnées Dataproc Metastore (roles/metastore.metadataUser) sur votre compte de service.

  4. Attribuez le rôle Utilisateur du compte de service (roles/iam.serviceAccountUser) au compte de service de l'utilisateur qui envoie le job. Pour obtenir des instructions, consultez Gérer l'accès aux comptes de service.

  5. Accordez au compte de service du lac Dataplex Universal Catalog les autorisations d'utiliser le compte de service. Vous trouverez le compte de service du lac Dataplex Universal Catalog sur la page Détails du lac de la consoleGoogle Cloud .

  6. Si le projet contenant votre lac Dataplex Universal Catalog est différent de celui dans lequel la tâche doit être exécutée, accordez au compte de service du lac Dataplex Universal Catalog le rôle Éditeur Dataproc dans le projet dans lequel vous exécutez la tâche.

  7. Placez les artefacts de code requis (fichiers JAR, Python ou script SQL) ou les fichiers archivés (.jar, .tar, .tar.gz, .tgz, .zip) dans un chemin Cloud Storage.

  8. Assurez-vous que le compte de service dispose de l'autorisation storage.objects.get requise pour le bucket Cloud Storage qui stocke ces artefacts de code.

Planifier une tâche Spark (Java ou Python)

Console

  1. Dans la console Google Cloud , accédez à la page Traiter de Dataplex Universal Catalog.

    Accéder à la page "Processus"

  2. Cliquez sur Créer une tâche.

  3. Pour Créer une tâche Spark personnalisée, cliquez sur Créer une tâche.

  4. Choisissez un lac Dataplex Universal Catalog.

  5. Indiquez un nom pour la tâche.

  6. Créez un ID pour votre tâche.

  7. Dans la section Configuration de la tâche, sélectionnez Spark ou PySpark pour Type.

  8. Saisissez les arguments pertinents.

  9. Dans le champ Compte de service, saisissez un compte de service utilisateur avec lequel votre tâche Spark personnalisée peut s'exécuter.

  10. Cliquez sur Continuer.

  11. Facultatif : Définir une programmation : sélectionnez Exécuter une fois ou Répéter. Renseignez les champs obligatoires.

  12. Cliquez sur Continuer.

  13. Facultatif : Personnalisez les ressources et ajoutez des paramètres supplémentaires.

  14. Cliquez sur Créer.

gcloud

Vous pouvez planifier une tâche Spark (Java / Python) à l'aide de la commande gcloud CLI. Le tableau suivant liste les paramètres obligatoires et facultatifs à utiliser :

Paramètre Description
--lake ID du lac pour la ressource de lac du service Dataplex Universal Catalog.
--location Emplacement du service Dataplex Universal Catalog.
--spark-main-class Classe principale du pilote. Le fichier jar contenant la classe doit se trouver dans le CLASSPATH par défaut.
--spark-main-jar-file-uri URI Cloud Storage du fichier jar contenant la classe principale.
--spark-archive-uris Facultatif : URI Cloud Storage des archives à extraire dans le répertoire de travail de chaque exécuteur. Types de fichiers acceptés : .jar, .tar, .tar.gz, .tgz et .zip.
--spark-file-uris Facultatif : URI Cloud Storage des fichiers à placer dans le répertoire de travail de chaque exécuteur.
--batch-executors-count Facultatif : nombre total d'exécuteurs de tâches. La valeur par défaut est "2".
--batch-max-executors-count Facultatif : Nombre maximal d'exécuteurs configurables. La valeur par défaut est 1 000. Si batch-max-executors-count est supérieur à batch-executors-count, Dataplex Universal Catalog active l'autoscaling.
--container-image-java-jars Facultatif : Liste des fichiers JAR Java à ajouter au chemin de classe. Les entrées valides incluent les URI Cloud Storage pointant vers des binaires Jar.
Par exemple, gs://bucket-name/my/path/to/file.jar.
--container-image-properties Facultatif : Clés de propriété, spécifiées au format prefix:property.
Par exemple, core:hadoop.tmp.dir.
Pour en savoir plus, consultez Propriétés du cluster.
--vpc-network-tags (Facultatif) Liste des tags réseau à appliquer au job.
--vpc-network-name Facultatif : réseau cloud privé virtuel dans lequel le job est exécuté. Par défaut, Dataplex Universal Catalog utilise le réseau VPC nommé Default dans le projet.
Vous ne devez utiliser qu'un seul des éléments suivants : --vpc-network-name ou --vpc-sub-network-name.
--vpc-sub-network-name (Facultatif) Sous-réseau VPC dans lequel le job s'exécute.
Vous ne devez utiliser qu'un seul des éléments suivants : --vpc-sub-network-name ou --vpc-network-name.
--trigger-type Type de déclencheur de la tâche spécifiée par l'utilisateur. Les valeurs doivent être l'une des suivantes :
ON_DEMAND : la tâche s'exécute une fois peu après sa création.
RECURRING : la tâche est exécutée périodiquement selon une planification.
--trigger-start-time Facultatif : heure de la première exécution de la tâche. Le format est `{year}-{month}-{day}T{hour}:{min}:{sec}Z`, où le fuseau horaire est UTC. Par exemple, "2017-01-15T01:30:00Z" correspond à 01h30 UTC le 15 janvier 2017. Si cette valeur n'est pas spécifiée, la tâche s'exécute après son envoi si le type de déclencheur est ON_DEMAND, ou selon la planification spécifiée si le type de déclencheur est RECURRING.
--trigger-disabled Facultatif : empêche l'exécution de la tâche. Ce paramètre n'annule pas les tâches déjà en cours d'exécution, mais désactive temporairement les tâches RECURRING.
--trigger-max-retires Facultatif : nombre de tentatives avant d'annuler. Définissez la valeur sur zéro pour ne jamais tenter de relancer une tâche ayant échoué.
--trigger-schedule Planification Cron pour exécuter des tâches périodiquement.
--description Facultatif : description de la tâche.
--display-name (Facultatif) Nom à afficher de la tâche.
--labels Facultatif : liste des paires de libellés KEY=VALUE à ajouter.
--execution-args Facultatif : arguments à transmettre à la tâche. Les arguments peuvent être un mélange de paires clé/valeur. Vous pouvez transmettre une liste de paires clé-valeur séparées par une virgule en tant qu'arguments d'exécution. Pour transmettre des arguments positionnels, définissez la clé sur TASK_ARGS et la valeur sur une chaîne contenant tous les arguments positionnels séparés par une virgule. Pour utiliser un délimiteur autre qu'une virgule, consultez la section sur l'échappement.
Si key-value et les arguments positionnels sont transmis ensemble, TASK_ARGS sera transmis en tant que dernier argument.
--execution-service-account Compte de service à utiliser pour exécuter une tâche.
--max-job-execution-lifetime Facultatif : Durée maximale avant l'expiration de l'exécution du job.
--container-image Facultatif : image de conteneur personnalisé pour l'environnement d'exécution du job. Si aucune valeur n'est spécifiée, une image de conteneur par défaut sera utilisée.
--kms-key Facultatif : clé Cloud KMS à utiliser pour le chiffrement, au format suivant :
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Exemple Java :

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

Exemple PySpark :

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

Pour créer une tâche, utilisez APIs Explorer.

Programmer une tâche Spark SQL

gcloud

Pour planifier une tâche SparkSQL, exécutez la même commande gcloud CLI que dans Planifier une tâche Spark (Java ou Python), avec les paramètres supplémentaires suivants :

Paramètre Description
--spark-sql-script Texte de la requête SQL. spark-sql-script ou spark-sql-script-file est obligatoire.
--spark-sql-script-file Référence à un fichier de requête. Cette valeur peut être l'URI Cloud Storage du fichier de requête ou le chemin d'accès au contenu du script SQL. spark-sql-script ou spark-sql-script-file est obligatoire.
--execution-args Pour les tâches Spark SQL, les arguments suivants sont obligatoires et doivent être transmis en tant qu'arguments positionnels :
, --output_location, <GCS uri of the output directory>
et --output_format, <output file format>.
Les formats acceptés sont les fichiers CSV, JSON, Parquet et ORC.
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

Pour créer une tâche, utilisez APIs Explorer.

Surveiller votre tâche

Console

  1. Dans la console Google Cloud , accédez à la page Traiter de Dataplex Universal Catalog.

    Accéder à la page "Processus"

  2. Dans l'onglet Tasks (Tâches), une liste de tâches est filtrée par type de modèle de tâche.

  3. Dans la colonne Nom, cliquez sur la tâche que vous souhaitez afficher.

  4. Cliquez sur l'ID de tâche de la tâche que vous souhaitez afficher.

    La page Dataproc s'ouvre dans la consoleGoogle Cloud , ce qui vous permet d'afficher les détails de la surveillance et de la sortie.

gcloud

Le tableau suivant répertorie les commandes de la gcloud CLI permettant de surveiller vos tâches.

Action Commande gcloud CLI
Lister les tâches gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
Afficher les détails d'une tâche gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Lister les jobs d'une tâche gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
Afficher les détails d'un job gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex Universal Catalog exécute des jobs sur Dataproc sans serveur (batches). Pour afficher les journaux d'exécution d'un job Dataplex Universal Catalog, procédez comme suit :

  1. Obtenez l'ID du job Dataproc sans serveur (lots). Exécutez la commande suivante :

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. Affichez les journaux. Exécutez la commande suivante en utilisant l'ID de job que vous avez obtenu en exécutant la commande précédente :

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

Pour get ou list une tâche ou un job, utilisez APIs Explorer.

Gérer la programmation

Dans la console Google Cloud , dans Dataplex Universal Catalog, vous pouvez modifier le calendrier d'une tâche, la supprimer ou annuler un job en cours. Le tableau suivant répertorie les commandes gcloud CLI pour ces actions.

Action Commande gcloud CLI
Modifier la programmation d'une tâche gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
Supprimer une tâche gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Annuler une mission gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Étapes suivantes