Monitoraggio e debug dei flussi di lavoro

Questa pagina fornisce informazioni utili per monitorare e eseguire il debug dei workflow Dataproc.

Flussi di lavoro delle schede

Un WorkflowTemplate inizializzato è chiamato "flusso di lavoro" ed è modellato come "operazione".

Esegui il seguente comando gcloud per elencare i flussi di lavoro del progetto:

gcloud dataproc operations list \
    --region=region \
    --filter="operationType = WORKFLOW"
...
OPERATION_NAME                                                DONE
projects/.../operations/07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../operations/1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Di seguito è riportata una richiesta di esempio per elencare tutti i flussi di lavoro avviati da un modello "terasort":

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id=terasort"
...
OPERATION_NAME                                     DONE
projects/.../07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Tieni presente che nelle query successive viene utilizzata solo la parte UUID di OPERATION_NAME.

Utilizzo di WorkflowMetadata

Il campo operation.metadata fornisce informazioni utili per diagnosticare i guasti del flusso di lavoro.

Ecco un WorkflowMetadata di esempio, incluso un grafo di nodi (job), incorporato in un'operazione:

{
  "name": "projects/my-project/regions/us-central1/operations/671c1d5d-9d24-4cc7-8c93-846e0f886d6e",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "template": "terasort",
    "version": 1,
    "createCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/8d472870-4a8b-4609-9f7d-48daccb028fc",
      "Done": true
    },
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "jobId": "teragen-vtrprwcgepyny",
          "state": "COMPLETED"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "jobId": "terasort-vtrprwcgepyny",
          "state": "FAILED",
          "error": "Job failed"
        },
        {
          "stepId": "teravalidate",
          "prerequisiteStepIds": [
            "terasort"
          ],
          "state": "FAILED",
          "error": "Skipped, node terasort failed"
        }
      ]
    },
    "deleteCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/9654c67b-2642-4142-a145-ca908e7c81c9",
      "Done": true
    },
    "state": "DONE",
    "clusterName": "terasort-cluster-vtrprwcgepyny"
  },
  "done": true,
  "error": {
    "message": "Workflow failed"
  }
}
Done!

Recuperare un modello

Come mostrato nell'esempio precedente, metadata contiene l'ID e la versione del modello.

"template": "terasort",
"version": 1,

Se un modello non viene eliminato, le versioni del modello istanziate possono essere recuperate tramite una richiesta describe-with-version.

gcloud dataproc workflow-templates describe terasort \
    --region=region \
    --version=1

Elenca le operazioni cluster avviate da un modello:

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
OPERATION_NAME                                     DONE
projects/.../cf9ce692-d6c9-4671-a909-09fd62041024  True
projects/.../1bbaefd9-7fd9-460f-9adf-ee9bc448b8b7  True

Ecco una richiesta di esempio per elencare i job inviati da un modello:

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id = terasort"
...
JOB_ID                TYPE     STATUS
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE
terasort3-3xwsy6ubbs4ak  pyspark  DONE
terasort2-3xwsy6ubbs4ak  pyspark  DONE
terasort1-3xwsy6ubbs4ak  pyspark  DONE
terasort3-ajov4nptsllti  pyspark  DONE
terasort2-ajov4nptsllti  pyspark  DONE
terasort1-ajov4nptsllti  pyspark  DONE
terasort1-b262xachbv6c4  pyspark  DONE
terasort1-cryvid3kreea2  pyspark  DONE
terasort1-ndprn46nesbv4  pyspark  DONE
terasort1-yznruxam4ppxi  pyspark  DONE
terasort1-ttjbhpqmw55t6  pyspark  DONE
terasort1-d7svwzloplbni  pyspark  DONE

Elenca i job inviati da un'istanza di flusso di lavoro:

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
JOB_ID                TYPE     STATUS
terasort3-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE

Tempo di attesa del flusso di lavoro

Puoi impostare un timeout del flusso di lavoro che lo annulli se i job del flusso di lavoro non vengono completati entro il periodo di timeout. Il periodo di timeout si applica al DAG (Directed Acyclic Graph) di job nel flusso di lavoro (la sequenza di job nel flusso di lavoro), non all' intera operazione del flusso di lavoro. Il periodo di timeout inizia quando viene avviato il primo job del flusso di lavoro e non include il tempo necessario per creare un cluster gestito. Se al termine del periodo di timeout è in esecuzione un job, tutti i job in esecuzione vengono bloccati, il flusso di lavoro viene terminato e, se il flusso di lavoro era in esecuzione su un cluster gestito, il cluster viene eliminato.

Vantaggio: utilizza questa funzionalità per evitare di dover terminare manualmente un flusso di lavoro che non viene completato a causa di job bloccati.

Impostazione di un timeout per il modello di workflow

Puoi impostare un periodo di timeout del modello di flusso di lavoro quando crei un modello di flusso di lavoro. Puoi anche aggiungere un timeout del flusso di lavoro a un modello di flusso di lavoro esistente aggiornandolo.

gcloud

Per impostare un timeout del flusso di lavoro in un nuovo modello, utilizza il flag --dag-timeout con il comando gcloud dataproc workflow-templates create. Puoi utilizzare i suffissi "s", "m", "h" e "d" per impostare rispettivamente i valori della durata in secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h" o "1d").

gcloud dataproc workflow-templates create template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d"") \
    ... other args ...

API

Per impostare un timeout del flusso di lavoro, compila il campo WorkflowTemplate dagTimeout nell'ambito di una richiesta workflowTemplates.create.

Console

Al momento, la console Google Cloud non supporta la creazione di un modello di flusso di lavoro.

Tempo di attesa per l'aggiornamento di un modello di workflow

Puoi aggiornare un modello di flusso di lavoro esistente per modificare, aggiungere o rimuovere un timeout del flusso di lavoro.

gcloud

Aggiunta o modifica di un timeout del flusso di lavoro

Per aggiungere o modificare un timeout del flusso di lavoro in un modello esistente, utilizza il flag --dag-timeout con il comando gcloud dataproc workflow-templates set-dag-timeout. Puoi utilizzare i suffissi "s", "m", "h" e "d" per impostare rispettivamente i valori della durata in secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h").

gcloud dataproc workflow-templates set-dag-timeout template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d")

Rimozione di un timeout del flusso di lavoro

Per rimuovere un timeout del flusso di lavoro da un modello esistente, utilizza il comando gcloud dataproc workflow-templates remove-dag-timeout.

gcloud dataproc workflow-templates remove-dag-timeout template-id (such as "my-workflow") \
    --region=region

API

Aggiunta o modifica di un timeout del flusso di lavoro

Per aggiungere o modificare un timeout del flusso di lavoro in un modello esistente, aggiorna il modello di flusso di lavoro preenchimento il campo dagTimeout del modello con il valore del timeout nuovo o modificato.

Rimozione di un timeout del flusso di lavoro

Per rimuovere un timeout del flusso di lavoro da un modello esistente, aggiorna il modello di flusso di lavoro rimuovendo il campo dagTimeout del modello.

Console

Al momento, la console Google Cloud non supporta l'aggiornamento di un modello di flusso di lavoro.