Schedule executions with Cloud Composer

This document shows you how to run scheduled executions of Dataform SQL workflows using Cloud Composer 2.

You can use Cloud Composer 2 to schedule Dataform executions. Dataform does not support Cloud Composer 1.

To manage schedules for Dataform executions with Cloud Composer 2, you can use Dataform operators in Airflow Directed Acyclic Graphs (DAGs). You can create an Airflow DAG that schedules Dataform workflow invocations.

Dataform provides various Airflow operators. These include operators for getting a compilation result, getting a workflow invocation, and canceling a workflow invocation. To view the complete list of available Dataform Airflow operators, see Google Dataform Operators.

Before you begin

  1. Select or create a Dataform repository.
  2. Grant Dataform access to BigQuery.
  3. Select or create a Dataform workspace.
  4. Create at least one table.
  5. Create a Cloud Composer 2 environment.
    1. Grant the roles/composer.worker and roles/dataform.editor roles to your Cloud Composer environment's service account.

Install the google-cloud-dataform PyPi package

If you use Cloud Composer 2 versions 2.0.25 and later, this package is preinstalled in your environment. You don't need to install it.

If you use earlier versions of Cloud Composer 2, install the google-cloud-dataform PyPi package.

In the PyPI packages section, specify version ==0.2.0.

Create an Airflow DAG that schedules Dataform workflow invocations

To manage scheduled executions of Dataform SQL workflows with Cloud Composer 2, write the DAG using Dataform Airflow operators, then upload it to your environment's bucket.

The following code sample shows an Airflow DAG that creates a Dataform compilation result and starts a Dataform workflow invocation:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud Project ID
  • REPOSITORY_ID: the name of your Dataform repository
  • REGION: the region in which the Dataform repository is located
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation
  • GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use

The following code sample shows an Airflow DAG that:

  1. Creates a Dataform compilation result.
  2. Starts an asynchronous Dataform workflow invocation.
  3. Polls the status of your workflow until it enters the expected state by using DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud Project ID
  • REPOSITORY_ID: the name of your Dataform repository
  • REGION: the region in which the Dataform repository is located
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation
  • GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation

Add compilation configuration parameters

You can add additional compilation configuration parameters to the create_compilation_result Airflow DAG object. For more information about available parameters, see the CodeCompilationConfig Dataform API reference.

  • To add compilation configuration parameters to the create_compilation_result Airflow DAG object, add your selected parameters to code_compilation_config in the following format:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud Project ID
  • REPOSITORY_ID: the name of your Dataform repository
  • REGION with the region in which the Dataform repository is located
  • GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use
  • PARAMETER: selected CodeCompilationConfig parameter. You can add multiple parameters.
  • PARAMETER_VALUE: value of the selected parameter

The following code sample shows the defaultDatabase parameter added to the create_compilation_result Airflow DAG object:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Add workflow invocation configuration parameters

You can add additional workflow invocation configuration parameters to the create_workflow_invocation Airflow DAG object. For more information about available parameters, see the InvocationConfig Dataform API reference.

  • To add workflow invocation configuration parameters to the create_workflow_invocation Airflow DAG object, add your selected parameters to invocation_config in the following format:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud Project ID
  • REPOSITORY_ID: the name of your Dataform repository
  • REGION: the region in which the Dataform repository is located
  • PARAMETER: selected InvocationConfig parameter. You can add multiple parameters.
  • PARAMETER_VALUE: value of the selected parameter

The following code sample shows the includedTags[] and transitiveDependenciesIncluded parameters added to the create_workflow_invocation Airflow DAG object:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

What's next