Zurückstellbare Operatoren in Airflow-DAGs verwenden

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite wird erläutert, wie Sie die Unterstützung für zurückstellbare Operatoren in Ihrer Umgebung aktivieren und zurückstellbare Google Cloud Operatoren in Ihren DAGs verwenden.

Deferrable Operators in Cloud Composer

Wenn Sie mindestens eine Triggerer-Instanz (oder mindestens zwei in hochgradig resilienten Umgebungen) haben, können Sie zurückstellbare Operatoren und Trigger in Ihren DAGs verwenden.

Bei aufschiebbaren Operatoren unterteilt Airflow die Ausführung von Aufgaben in die folgenden Phasen:

  1. Starten Sie den Vorgang. In dieser Phase belegt die Aufgabe einen Airflow-Worker-Slot. Die Aufgabe führt einen Vorgang aus, der den Job an einen anderen Dienst delegiert.

    Die Ausführung eines BigQuery-Jobs kann beispielsweise einige Sekunden bis mehrere Stunden dauern. Nachdem der Job erstellt wurde, übergibt der Vorgang die Arbeits-ID (BigQuery-Job-ID) an einen Airflow-Trigger.

  2. Der Trigger überwacht den Job, bis er abgeschlossen ist. In dieser Phase ist kein Worker-Slot belegt. Der Airflow-Triggerer hat eine asynchrone Architektur und kann Hunderte solcher Jobs verarbeiten. Wenn der Trigger erkennt, dass der Job abgeschlossen ist, wird ein Ereignis gesendet, das die letzte Phase auslöst.

  3. In der letzten Phase führt ein Airflow-Worker einen Callback aus. Mit diesem Callback kann die Aufgabe beispielsweise als erfolgreich markiert oder ein anderer Vorgang ausgeführt und der Job wieder vom Trigger überwacht werden.

Der Triggerer ist zustandslos und daher unempfindlich gegenüber Unterbrechungen oder Neustarts. Daher sind lang andauernde Jobs unempfindlich gegenüber Pod-Neustarts, es sei denn, der Neustart erfolgt in der letzten Phase, die kurz sein sollte.

Hinweise

Unterstützung für aufschiebbare Operatoren aktivieren

Eine Umgebungskomponente namens Airflow-Triggerer überwacht asynchron alle ausgesetzten Tasks in Ihrer Umgebung. Nachdem ein verzögerter Vorgang aus einer solchen Aufgabe abgeschlossen ist, übergibt der Triggerer die Aufgabe an einen Airflow-Worker.

Sie benötigen mindestens eine Triggerer-Instanz in Ihrer Umgebung (oder mindestens zwei in hochverfügbaren Umgebungen), um den aufschiebbaren Modus in Ihren DAGs zu verwenden. Sie können die Triggerer beim Erstellen einer Umgebung konfigurieren oder die Anzahl der Triggerer und Leistungsparameter für eine vorhandene Umgebung anpassen.

Google Cloud -Operatoren, die den verzögerbaren Modus unterstützen

Nur einige Airflow-Operatoren wurden erweitert, um das aufschiebbare Modell zu unterstützen. Die folgende Liste enthält die Operatoren im Paket apache-airflow-providers-google, die den aufschiebbaren Modus unterstützen. Die Spalte mit der erforderlichen Mindestversion des apache-airflow-providers-google-Pakets gibt die früheste Paketversion an, in der der Operator den aufschiebbaren Modus unterstützt.

BigQuery-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

Operatoren für BigQuery Data Transfer Service

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Batch-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudBatchSubmitJobOperator 10.7.0

Cloud Build-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudBuildCreateBuildOperator 8.7.0

Cloud Composer-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0
CloudComposerRunAirflowCLICommandOperator 11.0.0

Cloud Run-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudRunExecuteJobOperator 10.7.0

Cloud SQL-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudSQLExportInstanceOperator 10.3.0

Storage Transfer Service-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudDataTransferServiceS3ToGCSOperator 14.0.0
CloudDataTransferServiceGCSToGCSOperator 14.0.0

Dataflow-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0
DataflowStartYamlJobOperator 11.0.0

Cloud Data Fusion-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
CloudDataFusionStartPipelineOperator 8.9.0

Dataplex Universal Catalog-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
DataplexRunDataQualityScanOperator 10.8.0
DataplexGetDataQualityScanResultOperator 10.8.0
DataplexRunDataProfileScanOperator 11.0.0

Google Kubernetes Engine-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0
GKEStartPodOperator 12.0.0
GKEStartJobOperator 11.0.0

Pub/Sub-Operatoren

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
PubSubPullOperator 14.0.0

AI Platform-Betreiber

Name des Betreibers Erforderliche apache-airflow-providers-google-Version
MLEngineStartTrainingJobOperator 8.9.0

Zurückstellbare Operatoren in Ihren DAGs verwenden

Eine gängige Konvention für alle Google Cloud Operatoren ist es, den aufschiebbaren Modus mit dem booleschen Parameter deferrable zu aktivieren. Wenn ein Google Cloud-Operator diesen Parameter nicht hat, kann er nicht im aufschiebbaren Modus ausgeführt werden. Bei anderen Operatoren kann eine andere Konvention gelten. Einige Community-Betreiber haben beispielsweise eine separate Klasse mit dem Suffix Async im Namen.

Im folgenden Beispiel-DAG wird der Operator DataprocSubmitJobOperator im aufschiebbaren Modus verwendet:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

Triggerer-Logs ansehen

Der Triggerer generiert Logs, die zusammen mit Logs anderer Umgebungskomponenten verfügbar sind. Weitere Informationen zum Aufrufen von Umgebungs-Logs finden Sie unter Logs ansehen.

Trigger überwachen

Weitere Informationen zum Monitoring der Triggerer-Komponente finden Sie unter Airflow-Messwerte.

Neben der Überwachung des Auslösers können Sie die Anzahl der verzögerten Aufgaben in den Messwerten Unfinished Task (Nicht abgeschlossene Aufgabe) im Monitoring-Dashboard Ihrer Umgebung prüfen.

Nächste Schritte