Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina spiega come attivare il supporto per gli operatori differibili nel tuo ambiente e come utilizzare gli operatori differibili nei tuoi DAG. Google Cloud
Informazioni sugli operatori differibili in Cloud Composer
Se hai almeno un'istanza di attivatore (o almeno due in ambienti a elevata resilienza), puoi utilizzare operatori e trigger differibili nei tuoi DAG.
Per gli operatori differibili, Airflow suddivide l'esecuzione delle attività nelle seguenti fasi:
Avvia l'operazione. In questa fase, l'attività occupa uno slot worker di Airflow. L'attività esegue un'operazione che delega il job a un servizio diverso.
Ad esempio, l'esecuzione di un job BigQuery può richiedere da pochi secondi a diverse ore. Dopo aver creato il job, l'operazione trasferisce l'identificatore del lavoro (ID job BigQuery) a un trigger Airflow.
Il trigger monitora il job fino al completamento. In questa fase, uno slot di lavoro non è occupato. Il triggerer di Airflow ha un'architettura asincrona ed è in grado di gestire centinaia di questi job. Quando il trigger rileva che il job è terminato, invia un evento che attiva l'ultima fase.
Nell'ultima fase, un worker Airflow esegue un callback. Questo callback, ad esempio, può contrassegnare l'attività come riuscita o eseguire un'altra operazione e impostare di nuovo il job in modo che venga monitorato dal trigger.
Il trigger è senza stato e quindi resiliente a interruzioni o riavvii. Per questo motivo, i job a esecuzione prolungata sono resilienti ai riavvii dei pod, a meno che il riavvio non avvenga durante l'ultima fase, che dovrebbe essere breve.
Prima di iniziare
Attiva il supporto per gli operatori differibili
Un componente dell'ambiente chiamato triggerer di Airflow monitora in modo asincrono tutte le attività differite nel tuo ambiente. Una volta completata un'operazione posticipata di questo tipo, il triggerer passa l'attività a un worker Airflow.
Per utilizzare la modalità differibile nei DAG, è necessaria almeno un'istanza di trigger nell'ambiente (o almeno due in ambienti a elevata resilienza). Puoi configurare i trigger quando crei un ambiente o modificare il numero di trigger e i parametri di rendimento per un ambiente esistente.
Google Cloud operatori che supportano la modalità differibile
Solo alcuni operatori Airflow sono stati estesi per supportare il modello differibile.
Il seguente elenco è un riferimento per gli operatori nel pacchetto
apache-airflow-providers-google
che supportano la modalità differibile.
La colonna con la versione minima richiesta del pacchetto apache-airflow-providers-google
rappresenta la versione del pacchetto meno recente in cui l'operatore
supporta la modalità differibile.
Operatori BigQuery
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
Operatori BigQuery Data Transfer Service
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
Operatori batch
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudBatchSubmitJobOperator | 10.7.0 |
Operatori di Cloud Build
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Operatori di Cloud Composer
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
CloudComposerRunAirflowCLICommandOperator | 11.0.0 |
Operatori Cloud Run
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudRunExecuteJobOperator | 10.7.0 |
Operatori Cloud SQL
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Operatori di Storage Transfer Service
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudDataTransferServiceS3ToGCSOperator | 14.0.0 |
CloudDataTransferServiceGCSToGCSOperator | 14.0.0 |
Operatori Dataflow
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
DataflowStartYamlJobOperator | 11.0.0 |
Operatori Cloud Data Fusion
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Operatori del Catalogo universale Dataplex
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
DataplexRunDataQualityScanOperator | 10.8.0 |
DataplexGetDataQualityScanResultOperator | 10.8.0 |
DataplexRunDataProfileScanOperator | 11.0.0 |
Operatori Dataproc
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocDiagnoseClusterOperator | 11.0.0 |
DataprocCreateBatchOperator | 8.9.0 |
Operatori Google Kubernetes Engine
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
GKEStartPodOperator | 12.0.0 |
GKEStartJobOperator | 11.0.0 |
Operatori Pub/Sub
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
PubSubPullOperator | 14.0.0 |
Operatori AI Platform
Nome operatore | Versione apache-airflow-providers-google richiesta |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
Utilizzare operatori differibili nei DAG
Una convenzione comune per tutti gli Google Cloud operatori è quella di attivare la
modalità posticipabile con il parametro booleano deferrable
. Se un operatore Google Cloudnon ha questo parametro, non può essere eseguito in modalità differibile. Altri operatori potrebbero avere una convenzione diversa. Ad esempio, alcuni
operatori della community hanno una classe separata con il suffisso Async
nel
nome.
Il seguente DAG di esempio utilizza l'operatore DataprocSubmitJobOperator
in modalità
differibile:
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
Visualizza i log del trigger
Il trigger genera log disponibili insieme a quelli di altri componenti dell'ambiente. Per saperne di più sulla visualizzazione dei log dell'ambiente, consulta Visualizzare i log.
Monitorare l'attivatore
Per ulteriori informazioni sul monitoraggio del componente trigger, consulta Metriche di Airflow.
Oltre a monitorare il trigger, puoi controllare il numero di attività posticipate nelle metriche Attività non completata nella dashboard di monitoraggio del tuo ambiente.
Passaggi successivi
- Risoluzione dei problemi dell'attivatore Airflow
- Metriche del trigger di Airflow
- Log del trigger di Airflow