Questa pagina fornisce informazioni per aiutarti a monitorare ed eseguire il debug dei flussi di lavoro Dataflow.
Flussi di lavoro delle schede
Un istanza di flusso di lavoro creato come entità è detto "flusso di lavoro" ed è modellato come un'operazione di"
Esegui questo comando gcloud
per elencare i flussi di lavoro del progetto:
gcloud dataproc operations list \ --region=region \ --filter="operationType = WORKFLOW"
... OPERATION_NAME DONE projects/.../operations/07282b66-2c60-4919-9154-13bd4f03a1f2 True projects/.../operations/1c0b0fd5-839a-4ad4-9a57-bbb011956690 True
Ecco una richiesta di esempio per elencare tutti i flussi di lavoro avviati da un modello "terasort":
gcloud dataproc operations list \ --region=region \ --filter="labels.goog-dataproc-workflow-template-id=terasort"
... OPERATION_NAME DONE projects/.../07282b66-2c60-4919-9154-13bd4f03a1f2 True projects/.../1c0b0fd5-839a-4ad4-9a57-bbb011956690 True
Tieni presente che solo la parte UUID di OPERATION_NAME
viene utilizzata nelle query
successive.
usando WorkflowMetadata
Il campo operation.metadata
fornisce informazioni per aiutarti a diagnosticare gli errori del flusso di lavoro.
Ecco un esempio
WorkflowMetadata
,
incluso un grafico di nodi (job), incorporati in un'operazione:
{ "name": "projects/my-project/regions/us-central1/operations/671c1d5d-9d24-4cc7-8c93-846e0f886d6e", "metadata": { "@type": "type.googleapis.com/google.cloud.dataproc.v1.WorkflowMetadata", "template": "terasort", "version": 1, "createCluster": { "operationId": "projects/my-project/regions/us-central1/operations/8d472870-4a8b-4609-9f7d-48daccb028fc", "Done": true }, "graph": { "nodes": [ { "stepId": "teragen", "jobId": "teragen-vtrprwcgepyny", "state": "COMPLETED" }, { "stepId": "terasort", "prerequisiteStepIds": [ "teragen" ], "jobId": "terasort-vtrprwcgepyny", "state": "FAILED", "error": "Job failed" }, { "stepId": "teravalidate", "prerequisiteStepIds": [ "terasort" ], "state": "FAILED", "error": "Skipped, node terasort failed" } ] }, "deleteCluster": { "operationId": "projects/my-project/regions/us-central1/operations/9654c67b-2642-4142-a145-ca908e7c81c9", "Done": true }, "state": "DONE", "clusterName": "terasort-cluster-vtrprwcgepyny" }, "done": true, "error": { "message": "Workflow failed" } } Done!
Recupero di un modello
Come mostrato nell'esempio precedente, metadata
contiene l'ID e la versione del modello.
"template": "terasort", "version": 1,
Se un modello non viene eliminato, è possibile recuperare le versioni di modelli create mediante una richiesta describe-with-version.
gcloud dataproc workflow-templates describe terasort \ --region=region \ --version=1
Elenca operazioni del cluster avviate da un modello:
gcloud dataproc operations list \ --region=region \ --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
... OPERATION_NAME DONE projects/.../cf9ce692-d6c9-4671-a909-09fd62041024 True projects/.../1bbaefd9-7fd9-460f-9adf-ee9bc448b8b7 True
Ecco una richiesta di esempio per elencare i job inviati da un modello:
gcloud dataproc jobs list \ --region=region \ --filter="labels.goog-dataproc-workflow-template-id = terasort"
... JOB_ID TYPE STATUS terasort2-ci2ejdq2ta7l6 pyspark DONE terasort2-ci2ejdq2ta7l6 pyspark DONE terasort1-ci2ejdq2ta7l6 pyspark DONE terasort3-3xwsy6ubbs4ak pyspark DONE terasort2-3xwsy6ubbs4ak pyspark DONE terasort1-3xwsy6ubbs4ak pyspark DONE terasort3-ajov4nptsllti pyspark DONE terasort2-ajov4nptsllti pyspark DONE terasort1-ajov4nptsllti pyspark DONE terasort1-b262xachbv6c4 pyspark DONE terasort1-cryvid3kreea2 pyspark DONE terasort1-ndprn46nesbv4 pyspark DONE terasort1-yznruxam4ppxi pyspark DONE terasort1-ttjbhpqmw55t6 pyspark DONE terasort1-d7svwzloplbni pyspark DONE
Elenca i job inviati da un'istanza di flusso di lavoro:
gcloud dataproc jobs list \ --region=region \ --filter="labels.goog-dataproc-workflow-instance-id = 07282b66-2c60-4919-9154-13bd4f03a1f2"
... JOB_ID TYPE STATUS terasort3-ci2ejdq2ta7l6 pyspark DONE terasort2-ci2ejdq2ta7l6 pyspark DONE terasort1-ci2ejdq2ta7l6 pyspark DONE
Timeout del flusso di lavoro
Puoi impostare un timeout di flusso di lavoro che annullerà il flusso di lavoro se i job non terminano entro il periodo di timeout. Il periodo di timeout si applica al DAG (Direct Acyclic Graph) dei job nel flusso di lavoro (la sequenza di job nel flusso di lavoro), non all'intera operazione del flusso di lavoro. Il periodo di timeout inizia all'avvio del primo job del flusso di lavoro. Non include il tempo necessario per creare un cluster gestito. Se un job è in esecuzione alla fine del periodo di timeout, tutti i job in esecuzione vengono arrestati, il flusso di lavoro viene terminato e, se è in esecuzione su un cluster gestito, il cluster viene eliminato.
Vantaggio: utilizza questa funzionalità per evitare di dover terminare manualmente un flusso di lavoro che non viene completato a causa di job bloccati.
Impostazione del timeout di un modello di flusso di lavoro
Puoi impostare un periodo di timeout del modello di flusso di lavoro quando crei un modello di flusso di lavoro. Puoi anche aggiungere un timeout del flusso di lavoro a un modello di flusso di lavoro esistente aggiornando il modello del flusso di lavoro.
gcloud
Per impostare un timeout del flusso di lavoro su un nuovo modello, utilizza il flag --dag-timeout
con il comando
gcloud dataproc flussi-templates create. Puoi utilizzare "s", "m", "h" e "d" suffissi
per impostare rispettivamente i valori di durata secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h" o "1d").
gcloud dataproc workflow-templates create template-id (such as "my-workflow") \ --region=region \ --dag-timeout=duration (from "10m" to "24h" or "1d"") \ ... other args ...
API
Per impostare un timeout del flusso di lavoro, completa il campo WorkflowTemplate
dagTimeout
come parte di una richiesta workflowTemplates.create.
Console
Attualmente, la console Google Cloud non supporta la creazione di un modello di flusso di lavoro.
Aggiornamento di un timeout del modello di flusso di lavoro
Puoi aggiornare un modello di flusso di lavoro esistente per modificare, aggiungere o rimuovere un timeout del flusso di lavoro.
gcloud
Aggiungere o modificare un timeout del flusso di lavoro
Per aggiungere o modificare un timeout del flusso di lavoro su un modello esistente, utilizza il flag --dag-timeout
con il comando
gcloud dataproc flussi-templates set-dag-timeout. Puoi utilizzare "s", "m", "h" e "d" suffissi per impostare rispettivamente i valori di durata secondi, minuti, ore e giorni. La durata del timeout deve essere compresa tra 10 minuti ("10m") e 24 ore ("24h").
gcloud dataproc workflow-templates set-dag-timeout template-id (such as "my-workflow") \ --region=region \ --dag-timeout=duration (from "10m" to "24h" or "1d")
Rimuovere un timeout del flusso di lavoro
Per rimuovere un timeout del flusso di lavoro da un modello esistente, utilizza il comando gcloud dataproc flussi-templates remove-dag-timeout.
gcloud dataproc workflow-templates remove-dag-timeout template-id (such as "my-workflow") \ --region=region
API
Aggiungere o modificare un timeout del flusso di lavoro
Per aggiungere o modificare un timeout del flusso di lavoro in un modello esistente, aggiorna il modello del flusso di lavoro compilando il campo dagTimeout
del modello con il valore di timeout nuovo o modificato.
Rimuovere un timeout del flusso di lavoro
Per rimuovere un timeout del flusso di lavoro da un modello esistente, aggiorna il modello del flusso di lavoro rimuovendo il campo dagTimeout
del modello.
Console
Attualmente, la console Google Cloud non supporta l'aggiornamento di un modello di flusso di lavoro.