Monitoraggio e debug dei flussi di lavoro

Questa pagina fornisce informazioni per aiutarti a monitorare ed eseguire il debug dei flussi di lavoro Dataflow.

Flussi di lavoro delle schede

Un istanza di flusso di lavoro creato come entità è detto "flusso di lavoro" ed è modellato come un'operazione di"

Esegui questo comando gcloud per elencare i flussi di lavoro del progetto:

gcloud dataproc operations list \
    --region=region \
    --filter="operationType = WORKFLOW"
...
OPERATION_NAME                                                DONE
projects/.../operations/07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../operations/1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Ecco una richiesta di esempio per elencare tutti i flussi di lavoro avviati da un modello "terasort":

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id=terasort"
...
OPERATION_NAME                                     DONE
projects/.../07282b66-2c60-4919-9154-13bd4f03a1f2  True
projects/.../1c0b0fd5-839a-4ad4-9a57-bbb011956690  True

Tieni presente che solo la parte UUID di OPERATION_NAME viene utilizzata nelle query successive.

usando WorkflowMetadata

Il campo operation.metadata fornisce informazioni per aiutarti a diagnosticare gli errori del flusso di lavoro.

Ecco un esempio WorkflowMetadata, incluso un grafico di nodi (job), incorporati in un'operazione:

{
  "name": "projects/my-project/regions/us-central1/operations/671c1d5d-9d24-4cc7-8c93-846e0f886d6e",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata",
    "template": "terasort",
    "version": 1,
    "createCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/8d472870-4a8b-4609-9f7d-48daccb028fc",
      "Done": true
    },
    "graph": {
      "nodes": [
        {
          "stepId": "teragen",
          "jobId": "teragen-vtrprwcgepyny",
          "state": "COMPLETED"
        },
        {
          "stepId": "terasort",
          "prerequisiteStepIds": [
            "teragen"
          ],
          "jobId": "terasort-vtrprwcgepyny",
          "state": "FAILED",
          "error": "Job failed"
        },
        {
          "stepId": "teravalidate",
          "prerequisiteStepIds": [
            "terasort"
          ],
          "state": "FAILED",
          "error": "Skipped, node terasort failed"
        }
      ]
    },
    "deleteCluster": {
      "operationId": "projects/my-project/regions/us-central1/operations/9654c67b-2642-4142-a145-ca908e7c81c9",
      "Done": true
    },
    "state": "DONE",
    "clusterName": "terasort-cluster-vtrprwcgepyny"
  },
  "done": true,
  "error": {
    "message": "Workflow failed"
  }
}
Done!

Recupero di un modello

Come mostrato nell'esempio precedente, metadata contiene l'ID e la versione del modello.

"template": "terasort",
"version": 1,

Se un modello non viene eliminato, è possibile recuperare le versioni di modelli create mediante una richiesta describe-with-version.

gcloud dataproc workflow-templates describe terasort \
    --region=region \
    --version=1

Elenca operazioni del cluster avviate da un modello:

gcloud dataproc operations list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
OPERATION_NAME                                     DONE
projects/.../cf9ce692-d6c9-4671-a909-09fd62041024  True
projects/.../1bbaefd9-7fd9-460f-9adf-ee9bc448b8b7  True

Ecco una richiesta di esempio per elencare i job inviati da un modello:

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-template-id = terasort"
...
JOB_ID                TYPE     STATUS
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE
terasort3-3xwsy6ubbs4ak  pyspark  DONE
terasort2-3xwsy6ubbs4ak  pyspark  DONE
terasort1-3xwsy6ubbs4ak  pyspark  DONE
terasort3-ajov4nptsllti  pyspark  DONE
terasort2-ajov4nptsllti  pyspark  DONE
terasort1-ajov4nptsllti  pyspark  DONE
terasort1-b262xachbv6c4  pyspark  DONE
terasort1-cryvid3kreea2  pyspark  DONE
terasort1-ndprn46nesbv4  pyspark  DONE
terasort1-yznruxam4ppxi  pyspark  DONE
terasort1-ttjbhpqmw55t6  pyspark  DONE
terasort1-d7svwzloplbni  pyspark  DONE

Elenca i job inviati da un'istanza di flusso di lavoro:

gcloud dataproc jobs list \
    --region=region \
    --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
...
JOB_ID                TYPE     STATUS
terasort3-ci2ejdq2ta7l6  pyspark  DONE
terasort2-ci2ejdq2ta7l6  pyspark  DONE
terasort1-ci2ejdq2ta7l6  pyspark  DONE

Timeout del flusso di lavoro

Puoi impostare un timeout di flusso di lavoro che annullerà il flusso di lavoro se i job non terminano entro il periodo di timeout. Il periodo di timeout si applica al DAG (Direct Acyclic Graph) dei job nel flusso di lavoro (la sequenza di job nel flusso di lavoro), non all'intera operazione del flusso di lavoro. Il periodo di timeout inizia all'avvio del primo job del flusso di lavoro. Non include il tempo necessario per creare un cluster gestito. Se un job è in esecuzione alla fine del periodo di timeout, tutti i job in esecuzione vengono arrestati, il flusso di lavoro viene terminato e, se è in esecuzione su un cluster gestito, il cluster viene eliminato.

Vantaggio: utilizza questa funzionalità per evitare di dover terminare manualmente un flusso di lavoro che non viene completato a causa di job bloccati.

Impostazione del timeout di un modello di flusso di lavoro

Puoi impostare un periodo di timeout del modello di flusso di lavoro quando crei un modello di flusso di lavoro. Puoi anche aggiungere un timeout del flusso di lavoro a un modello di flusso di lavoro esistente aggiornando il modello del flusso di lavoro.

gcloud

Per impostare un timeout del flusso di lavoro su un nuovo modello, utilizza il flag --dag-timeout con il comando gcloud dataproc flussi-templates create. Puoi utilizzare "s", "m", "h" e "d" suffissi per impostare rispettivamente i valori di durata secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h" o "1d").

gcloud dataproc workflow-templates create template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d"") \
    ... other args ...

API

Per impostare un timeout del flusso di lavoro, completa il campo WorkflowTemplate dagTimeout come parte di una richiesta workflowTemplates.create.

Console

Attualmente, la console Google Cloud non supporta la creazione di un modello di flusso di lavoro.

Aggiornamento di un timeout del modello di flusso di lavoro

Puoi aggiornare un modello di flusso di lavoro esistente per modificare, aggiungere o rimuovere un timeout del flusso di lavoro.

gcloud

Aggiungere o modificare un timeout del flusso di lavoro

Per aggiungere o modificare un timeout del flusso di lavoro su un modello esistente, utilizza il flag --dag-timeout con il comando gcloud dataproc flussi-templates set-dag-timeout. Puoi utilizzare "s", "m", "h" e "d" suffissi per impostare rispettivamente i valori di durata secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h").

gcloud dataproc workflow-templates set-dag-timeout template-id (such as "my-workflow") \
    --region=region \
    --dag-timeout=duration (from "10m" to "24h" or "1d")

Rimuovere un timeout del flusso di lavoro

Per rimuovere un timeout del flusso di lavoro da un modello esistente, utilizza il comando gcloud dataproc flussi-templates remove-dag-timeout.

gcloud dataproc workflow-templates remove-dag-timeout template-id (such as "my-workflow") \
    --region=region

API

Aggiungere o modificare un timeout del flusso di lavoro

Per aggiungere o modificare un timeout del flusso di lavoro in un modello esistente, aggiorna il modello del flusso di lavoro compilando il campo dagTimeout del modello con il valore di timeout nuovo o modificato.

Rimuovere un timeout del flusso di lavoro

Per rimuovere un timeout del flusso di lavoro da un modello esistente, aggiorna il modello del flusso di lavoro rimuovendo il campo dagTimeout del modello.

Console

Attualmente, la console Google Cloud non supporta l'aggiornamento di un modello di flusso di lavoro.