Surveillance et débogage des workflows

Cette page fournit des informations pour vous aider à surveiller et à déboguer les workflows Dataproc.

Répertorier des workflows

Un objet WorkflowTemplate instancié est appelé un "workflow", calqué sur la structure d'une "opération".

Exécutez la commande gcloud suivante pour répertorier les flux de travail de votre projet :

gcloud dataproc operations list \
    --region=region \
    --filter="operationType = WORKFLOW"
...
OPERATION_NAME                                                DONE
projects/.../operations/07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../operations/1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Voici un exemple de requête permettant de répertorier tous les workflows lancés à partir d'un modèle nommé "terasort" :

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id=terasort"
...
OPERATION_NAME                                     DONE
projects/.../07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Sachez que seule la partie UUID de OPERATION_NAME est utilisée dans les requêtes suivantes.

Utiliser WorkflowMetadata

Le champ operation.metadata fournit des informations pour vous aider à diagnostiquer les échecs de workflow.

Voici un exemple WorkflowMetadata, comprenant un graphique de nœuds (tâches), intégré à une opération :

{
  "name": "projects/my-project/regions/us-central1/operations/671c1d5d-9d24-4cc7-8c93-846e0f886d6e",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "template": "terasort",
    "version": 1,
    "createCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/8d472870-4a8b-4609-9f7d-48daccb028fc",
      "Done": true
    },
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "jobId": "teragen-vtrprwcgepyny",
          "state": "COMPLETED"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "jobId": "terasort-vtrprwcgepyny",
          "state": "FAILED",
          "error": "Job failed"
        },
        {
          "stepId": "teravalidate",
          "prerequisiteStepIds": [
            "terasort"
          ],
          "state": "FAILED",
          "error": "Skipped, node terasort failed"
        }
      ]
    },
    "deleteCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/9654c67b-2642-4142-a145-ca908e7c81c9",
      "Done": true
    },
    "state": "DONE",
    "clusterName": "terasort-cluster-vtrprwcgepyny"
  },
  "done": true,
  "error": {
    "message": "Workflow failed"
  }
}
Done!

Récupérer un modèle

Comme indiqué dans l'exemple précédent, metadata contient l'identifiant et la version du modèle.

"template": "terasort",
"version": 1,

Si un modèle n'est pas supprimé, les versions de modèle instanciées peuvent être récupérées par une requête describe avec -version.

gcloud dataproc workflow-templates describe terasort \
    --region=region \
    --version=1

Voici comment répertorier les opérations de cluster lancées par un modèle :

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
OPERATION_NAME                                     DONE
projects/.../cf9ce692-d6c9-4671-a909-09fd62041024  True
projects/.../1bbaefd9-7fd9-460f-9adf-ee9bc448b8b7  True

Voici un exemple de requête permettant de répertorier les tâches soumises à partir d'un modèle :

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id = terasort"
...
JOB_ID                TYPE     STATUS
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE
terasort3-3xwsy6ubbs4ak  pyspark  DONE
terasort2-3xwsy6ubbs4ak  pyspark  DONE
terasort1-3xwsy6ubbs4ak  pyspark  DONE
terasort3-ajov4nptsllti  pyspark  DONE
terasort2-ajov4nptsllti  pyspark  DONE
terasort1-ajov4nptsllti  pyspark  DONE
terasort1-b262xachbv6c4  pyspark  DONE
terasort1-cryvid3kreea2  pyspark  DONE
terasort1-ndprn46nesbv4  pyspark  DONE
terasort1-yznruxam4ppxi  pyspark  DONE
terasort1-ttjbhpqmw55t6  pyspark  DONE
terasort1-d7svwzloplbni  pyspark  DONE

Voici comment répertorier les tâches soumises à partir d'une instance de workflow :

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
JOB_ID                TYPE     STATUS
terasort3-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE

Délais avant expiration des workflows

Vous pouvez définir un délai avant expiration du workflow qui va annuler le workflow si les tâches du workflow ne se terminent pas dans le délai imparti. Le délai avant expiration s'applique DAG (graphe orienté acyclique) du workflow (séquence de jobs dans le workflow), et non à la l'ensemble du processus. Le délai avant expiration commence au démarrage de la première tâche de workflow. Il n'inclut pas le temps nécessaire à la création d'un cluster géré. Si une tâche est en cours d'exécution à la fin du délai avant expiration, toutes les tâches en cours d'exécution sont arrêtées, le workflow est interrompu et, s'il était exécuté sur un cluster géré, le cluster est supprimé.

Avantage : Utilisez cette fonctionnalité pour éviter d'avoir à arrêter manuellement un workflow qui ne se termine pas en raison de tâches bloquées.

Définir un délai avant expiration pour un modèle de workflow

Lorsque vous créez un modèle de workflow, vous pouvez définir un délai avant expiration. Lorsque vous mettez à jour un modèle de workflow existant, vous pouvez également le doter d'un délai avant expiration.

gcloud

Pour définir un délai avant expiration de workflow sur un nouveau modèle, utilisez la commande gcloud beta dataproc workflow-template create avec l’option --dag-timeout. Les suffixes "s", "m", "h" et "d" définissent les unités de temps : secondes, minutes, heures et jours respectivement. Le délai avant expiration doit durer entre 10 minutes ("10m") et 24 heures ("24h" ou "1d").

gcloud dataproc workflow-templates create template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d"") \
    ... other args ...

API

Pour définir un délai avant expiration de workflow, remplissez le champ WorkflowTemplate dagTimeout dans la requête workflowTemplates.create.

Console

Actuellement, la console Google Cloud ne permet pas de créer modèle de workflow.

Mettre à jour le délai avant expiration d'un modèle de workflow

Pour modifier, ajouter ou supprimer un délai avant expiration sur un modèle de workflow existant, vous devez mettre à jour ce modèle.

gcloud

Ajouter ou modifier un délai avant expiration de workflow

Pour ajouter ou modifier un délai avant expiration de workflow sur un modèle existant, utilisez la commande gcloud beta dataproc workflow-templates set-dag-timeout avec l'option --dag-timeout. Les suffixes "s", "m", "h" et "d" définissent les unités de temps : secondes, minutes, heures et jours respectivement. Le délai avant expiration doit durer entre 10 minutes ("10m") et 24 heures ("24h").

gcloud dataproc workflow-templates set-dag-timeout template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d")

Supprimer un délai avant expiration de workflow

Pour supprimer le délai avant expiration d'un modèle existant, utilisez la commande gcloud beta dataproc workflow-templates remove-dag-timeout.

gcloud dataproc workflow-templates remove-dag-timeout template-id (such as "my-workflow") \
    --region=region

API

Ajouter ou modifier un délai avant expiration de workflow

Pour ajouter ou modifier un délai avant expiration de workflow sur un modèle existant, mettez à jour le modèle en remplissant le champ dagTimeout du modèle avec la nouvelle valeur du délai.

Supprimer un délai avant expiration de workflow

Pour supprimer un délai avant expiration de workflow d'un modèle existant, mettez à jour le modèle de workflow en supprimant le champ dagTimeout.

Console

Actuellement, la console Google Cloud ne permet pas de mettre à jour modèle de workflow.