Ausführungen mit Cloud Composer planen

In diesem Dokument erfahren Sie, wie Sie geplante Ausführungen von Dataform-SQL-Workflows mit Cloud Composer 2 ausführen.

Dataform-Ausführungen können mit Cloud Composer 2 geplant werden. Cloud Composer 1 wird von Dataform nicht unterstützt.

Zum Verwalten von Zeitplänen für Dataform-Ausführungen mit Cloud Composer 2 können Sie Dataform-Operatoren in Airflow-Directed Acyclic Graphs (DAGs) verwenden. Sie können einen Airflow-DAG erstellen, der Dataform-Workflowaufrufe plant.

Dataform bietet verschiedene Airflow-Operatoren. Dazu gehören Operatoren zum Abrufen eines Kompilierungsergebnisses, zum Abrufen eines Workflowaufrufs und zum Abbrechen eines Workflowaufrufs. Eine vollständige Liste der verfügbaren Dataform Airflow-Operatoren finden Sie unter Google Dataform-Operatoren.

Hinweise

  1. Wählen oder erstellen Sie ein Dataform-Repository.
  2. Dataform Zugriff auf BigQuery gewähren
  3. Wählen Sie einen Dataform-Arbeitsbereich aus oder erstellen Sie einen.
  4. Erstellen Sie mindestens eine Tabelle.
  5. Erstellen Sie eine Cloud Composer 2-Umgebung.
    1. Weisen Sie dem Dienstkonto Ihrer Cloud Composer-Umgebung die Rollen roles/composer.worker und roles/dataform.editor zu.

PyPi-Paket google-cloud-dataform installieren

Wenn Sie die Cloud Composer 2-Version 2.0.25 und höher verwenden, ist dieses Paket in Ihrer Umgebung vorinstalliert. Sie müssen es nicht installieren.

Wenn Sie frühere Versionen von Cloud Composer 2 verwenden, installieren Sie das PyPi-Paket google-cloud-dataform.

Geben Sie im Abschnitt „PyPI-Pakete“ die Version ==0.2.0 an.

Airflow-DAG zum Planen von Dataform-Workflowaufrufen erstellen

Wenn Sie geplante Ausführungen von Dataform SQL-Workflows mit Cloud Composer 2 verwalten möchten, schreiben Sie den DAG mit Dataform Airflow-Operatoren und laden Sie ihn dann in den Bucket Ihrer Umgebung hoch.

Das folgende Codebeispiel zeigt einen Airflow-DAG, der ein Dataform-Kompilierungsergebnis erstellt und einen Dataform-Workflowaufruf startet:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
  • REPOSITORY_ID: der Name Ihres Dataform-Repositorys
  • REGION: Region, in der sich das Dataform-Repository befindet
  • COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll
  • GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten

Das folgende Codebeispiel zeigt einen Airflow-DAG, der:

  1. Erstellt ein Dataform-Kompilierungsergebnis.
  2. Startet einen asynchronen Dataform-Workflowaufruf.
  3. Fragt den Status Ihres Workflows ab, bis er mit DataformWorkflowInvocationStateSensor in den erwarteten Status wechselt.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
  • REPOSITORY_ID: der Name Ihres Dataform-Repositorys
  • REGION: Region, in der sich das Dataform-Repository befindet
  • COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll
  • GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten
  • COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll

Kompilierungskonfigurationsparameter hinzufügen

Sie können dem Airflow-DAG-Objekt create_compilation_result zusätzliche Kompilierungskonfigurationsparameter hinzufügen. Weitere Informationen zu verfügbaren Parametern finden Sie in der Referenz zur CodeCompilationConfig Dataform API.

  • Wenn Sie dem Airflow-DAG-Objekt create_compilation_result Kompilierungskonfigurationsparameter hinzufügen möchten, fügen Sie die ausgewählten Parameter im folgenden Format zu code_compilation_config hinzu:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
  • REPOSITORY_ID: der Name Ihres Dataform-Repositorys
  • REGION durch die Region, in der sich das Dataform-Repository befindet
  • GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten
  • PARAMETER: ausgewählter Parameter CodeCompilationConfig. Sie können mehrere Parameter hinzufügen.
  • PARAMETER_VALUE: Wert des ausgewählten Parameters

Im folgenden Codebeispiel wird der Parameter defaultDatabase dem Airflow-DAG-Objekt create_compilation_result hinzugefügt:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Konfigurationsparameter für Workflowaufruf hinzufügen

Sie können dem Airflow-DAG-Objekt create_workflow_invocation zusätzliche Konfigurationsparameter für Workflowaufrufe hinzufügen. Weitere Informationen zu verfügbaren Parametern finden Sie in der Referenz zur InvocationConfig Dataform API.

  • Wenn Sie dem Airflow-DAG-Objekt create_workflow_invocation Workflowaufrufkonfigurationsparameter hinzufügen möchten, fügen Sie die ausgewählten Parameter im folgenden Format zu invocation_config hinzu:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
  • REPOSITORY_ID: der Name Ihres Dataform-Repositorys
  • REGION: Region, in der sich das Dataform-Repository befindet
  • PARAMETER: ausgewählter Parameter InvocationConfig. Sie können mehrere Parameter hinzufügen.
  • PARAMETER_VALUE: Wert des ausgewählten Parameters

Im folgenden Codebeispiel werden die Parameter includedTags[] und transitiveDependenciesIncluded dem Airflow-DAG-Objekt create_workflow_invocation hinzugefügt:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Nächste Schritte