Utilizzare operatori differibili nei DAG Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina spiega come attivare il supporto per gli operatori differibili nel tuo ambiente e come utilizzare gli operatori differibili nei tuoi DAG. Google Cloud

Informazioni sugli operatori differibili in Cloud Composer

Se hai almeno un'istanza di attivatore (o almeno due in ambienti a elevata resilienza), puoi utilizzare operatori e trigger differibili nei tuoi DAG.

Per gli operatori differibili, Airflow suddivide l'esecuzione delle attività nelle seguenti fasi:

  1. Avvia l'operazione. In questa fase, l'attività occupa uno slot worker di Airflow. L'attività esegue un'operazione che delega il job a un servizio diverso.

    Ad esempio, l'esecuzione di un job BigQuery può richiedere da pochi secondi a diverse ore. Dopo aver creato il job, l'operazione trasferisce l'identificatore del lavoro (ID job BigQuery) a un trigger Airflow.

  2. Il trigger monitora il job fino al completamento. In questa fase, uno slot di lavoro non è occupato. Il triggerer di Airflow ha un'architettura asincrona ed è in grado di gestire centinaia di questi job. Quando il trigger rileva che il job è terminato, invia un evento che attiva l'ultima fase.

  3. Nell'ultima fase, un worker Airflow esegue un callback. Questo callback, ad esempio, può contrassegnare l'attività come riuscita o eseguire un'altra operazione e impostare di nuovo il job in modo che venga monitorato dal trigger.

Il trigger è senza stato e quindi resiliente a interruzioni o riavvii. Per questo motivo, i job a esecuzione prolungata sono resilienti ai riavvii dei pod, a meno che il riavvio non avvenga durante l'ultima fase, che dovrebbe essere breve.

Prima di iniziare

Attiva il supporto per gli operatori differibili

Un componente dell'ambiente chiamato triggerer di Airflow monitora in modo asincrono tutte le attività differite nel tuo ambiente. Una volta completata un'operazione posticipata di questo tipo, il triggerer passa l'attività a un worker Airflow.

Per utilizzare la modalità differibile nei DAG, è necessaria almeno un'istanza di trigger nell'ambiente (o almeno due in ambienti a elevata resilienza). Puoi configurare i trigger quando crei un ambiente o modificare il numero di trigger e i parametri di rendimento per un ambiente esistente.

Google Cloud operatori che supportano la modalità differibile

Solo alcuni operatori Airflow sono stati estesi per supportare il modello differibile. Il seguente elenco è un riferimento per gli operatori nel pacchetto apache-airflow-providers-google che supportano la modalità differibile. La colonna con la versione minima richiesta del pacchetto apache-airflow-providers-google rappresenta la versione del pacchetto meno recente in cui l'operatore supporta la modalità differibile.

Operatori BigQuery

Nome operatore Versione apache-airflow-providers-google richiesta
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operatori BigQuery Data Transfer Service

Nome operatore Versione apache-airflow-providers-google richiesta
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Operatori batch

Nome operatore Versione apache-airflow-providers-google richiesta
CloudBatchSubmitJobOperator 10.7.0

Operatori di Cloud Build

Nome operatore Versione apache-airflow-providers-google richiesta
CloudBuildCreateBuildOperator 8.7.0

Operatori di Cloud Composer

Nome operatore Versione apache-airflow-providers-google richiesta
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Operatori Cloud Run

Nome operatore Versione apache-airflow-providers-google richiesta
CloudRunExecuteJobOperator 10.7.0

Operatori Cloud SQL

Nome operatore Versione apache-airflow-providers-google richiesta
CloudSQLExportInstanceOperator 10.3.0

Operatori di Storage Transfer Service

Nome operatore Versione apache-airflow-providers-google richiesta
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Operatori Dataflow

Nome operatore Versione apache-airflow-providers-google richiesta
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Operatori Cloud Data Fusion

Nome operatore Versione apache-airflow-providers-google richiesta
CloudDataFusionStartPipelineOperator 8.9.0

Operatori del Catalogo universale Dataplex

Nome operatore Versione apache-airflow-providers-google richiesta
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Operatori Google Kubernetes Engine

Nome operatore Versione apache-airflow-providers-google richiesta
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Operatori Pub/Sub

Nome operatore Versione apache-airflow-providers-google richiesta
PubSubPullOperator 14.0.0

Operatori AI Platform

Nome operatore Versione apache-airflow-providers-google richiesta
MLEngineStartTrainingJobOperator 8.9.0

Utilizzare operatori differibili nei DAG

Una convenzione comune per tutti gli Google Cloud operatori è quella di attivare la modalità posticipabile con il parametro booleano deferrable. Se un operatore Google Cloudnon ha questo parametro, non può essere eseguito in modalità differibile. Altri operatori potrebbero avere una convenzione diversa. Ad esempio, alcuni operatori della community hanno una classe separata con il suffisso Async nel nome.

Il seguente DAG di esempio utilizza l'operatore DataprocSubmitJobOperator in modalità differibile:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Visualizza i log del trigger

Il trigger genera log disponibili insieme a quelli di altri componenti dell'ambiente. Per saperne di più sulla visualizzazione dei log dell'ambiente, consulta Visualizzare i log.

Monitorare l'attivatore

Per ulteriori informazioni sul monitoraggio del componente trigger, consulta Metriche di Airflow.

Oltre a monitorare il trigger, puoi controllare il numero di attività posticipate nelle metriche Attività non completata nella dashboard di monitoraggio del tuo ambiente.

Passaggi successivi