Schedule runs

This document shows you how to do the following in Dataform:

Before you begin

To schedule runs with workflow configurations or schedule runs with workflows and Cloud Scheduler, ensure you do the following:

  1. In the Google Cloud console, go to the Dataform page.

    Go to Dataform

  2. Select or create a repository.

  3. Create a release configuration.

To schedule runs with Cloud Composer, ensure you do the following:

  1. Select or create a Dataform repository.
  2. Grant Dataform access to BigQuery.
  3. Select or create a Dataform workspace.
  4. Create at least one table.
  5. Create a Cloud Composer 2 environment.

Required roles

To get the permissions that you need to complete the tasks in this document, ask your administrator to grant you the following IAM roles:

For more information about granting roles, see Manage access to projects, folders, and organizations.

You might also be able to get the required permissions through custom roles or other predefined roles.

To use a service account other than the default Dataform service account, grant access to the custom service account.

Schedule runs with workflow configurations

This section shows you how to create a workflow configuration in Dataform to schedule and configure workflow runs. You can use workflow configurations to execute Dataform workflows on a schedule.

About workflow configurations

To schedule Dataform runs of all or selected workflow actions in BigQuery, you can create workflow configurations. In a workflow configuration, you select a compilation release configuration, select workflow actions for execution, and set the run schedule.

Then, during a scheduled run of your workflow configuration, Dataform deploys your selection of actions from the latest compilation result in your release configuration to BigQuery. You can also manually trigger the run of a workflow configuration with the Dataform API workflowConfigs.

A Dataform workflow configuration contains the following run settings:

  • ID of the workflow configuration.
  • Release configuration.
  • Service account.

    This is the service account that's associated with the workflow configuration. You can select the default Dataform service account or a service account that's associated with your Google Cloud project, or you can manually enter a different service account. By default, workflow configurations use the same service accounts as their repositories.

  • Workflow actions to be executed:

    • All actions.
    • Selection of actions.
    • Selection of tags.
  • Run schedule and time zone.

Create a workflow configuration

To create a Dataform workflow configuration, follow these steps:

  1. In your repository, go to Releases & Scheduling.
  2. In the Workflow configurations section, click Create.
  3. In the Create workflow configuration pane, in the Configuration ID field, enter a unique ID for the workflow configuration.

    IDs can only include numbers, letters, hyphens, and underscores.

  4. In the Release configuration menu, select a compilation release configuration.

  5. Optional: In the Frequency field, enter the frequency of runs in the unix-cron format.

    To ensure that Dataform executes the latest compilation result in the corresponding release configuration, keep a minimum one-hour break between the time of compilation result creation and the time of scheduled execution.

  6. In the Service account menu, select a service account for the workflow configuration.

    In the menu, you can select the default Dataform service account or any service account associated with your Google Cloud project that you have access to. If you don't select a service account, the workflow configuration uses the service account of the repository.

  7. Optional: In the Timezone menu, select the time zone for runs.

    The default time zone is UTC.

  8. Select the workflow actions to be executed:

    • To execute the entire workflow, click All actions.
    • To execute selected actions in the workflow, click Selection of actions, and then select actions.
    • To execute actions with selected tags, click Selection of tags, and then select tags.
    • Optional: To execute selected actions or tags and their dependencies, select the Include dependencies option.
    • Optional: To execute selected actions or tags and their dependents, select the Include dependents option.
    • Optional: To rebuild all tables from scratch, select the Run with full refresh option.

    Without this option, Dataform updates incremental tables without rebuilding them from scratch.

  9. Click Create.

For example, the following workflow configuration executes actions with the hourly tag every hour in the CEST timezone:

  • Configuration ID: production-hourly
  • Release configuration: -
  • Frequency: 0 * * * *
  • Timezone: Central European Summer Time (CEST)
  • Selection of workflow actions: selection of tags, hourly tag

Edit a workflow configuration

To edit a workflow configuration, follow these steps:

  1. In your repository, go to Releases & Scheduling.
  2. By the workflow configuration that you want to edit, click the More menu, and then click Edit.
  3. In the Edit workflow configuration pane, edit the release configuration settings, and then click Save.

Delete a workflow configuration

To delete a workflow configuration, follow these steps:

  1. In your repository, go to Releases & Scheduling.
  2. By the workflow configuration that you want to delete, click the More menu, and then click Delete.
  3. In the Delete release configuration dialog, click Delete.

Schedule runs with Workflows and Cloud Scheduler

This section shows you how to schedule runs of Dataform workflows using Workflows and Cloud Scheduler.

About scheduled workflow runs

You can set the frequency of your Dataform workflow runs by creating a Cloud Scheduler job that triggers a Workflows workflow. Workflows executes services in an orchestration workflow that you define.

Workflows executes your Dataform workflow in a two-step process. First, it pulls your Dataform repository code from your Git provider and compiles it into a compilation result. Then, it uses the compilation result to create a Dataform workflow and executes it at the frequency that you set.

Create a scheduled orchestration workflow

To schedule runs of your Dataform workflow, use Workflows to create an orchestration workflow and add a Cloud Scheduler job as a trigger.

  1. Workflows uses service accounts to give workflows access to Google Cloud resources. Create a service account and grant it the Dataform Editor (roles/dataform.editor) Identity and Access Management role as well as the minimum permissions required to manage your orchestration workflow. For more information, see Grant a workflow permission to access Google Cloud resources.

  2. Create an orchestration workflow and use the following YAML source code as your workflow definition:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Replace the following:

    • PROJECT_ID: the ID of your Google Cloud project.
    • REPOSITORY_LOCATION: the location of your Dataform repository.
    • REPOSITORY_ID: the name of your Dataform repository.
    • GIT_COMMITISH: the Git branch that you want to execute Dataform code from. For a newly created repository, replace with main.
  3. Schedule the orchestration workflow using Cloud Scheduler.

Customize the Dataform workflow create compilation result request

You can update the existing orchestration workflow and define the Dataform workflow create compilation result request settings in the YAML format. For more information about the settings, see the projects.locations.repositories.compilationResults REST resource reference.

For example, to add a _dev schemaSuffix setting to all actions during compilation, replace the createCompilationResult step body with the following code snippet:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

You can also pass additional settings as runtime arguments in a Workflows run request and access those arguments using variables. For more information, see Pass runtime arguments in an execution request.

Customize the Dataform workflow invocation request

You can update the existing orchestration workflow and define the Dataform workflow invocation request settings in the YAML format. For more information about the invocation request settings, see the projects.locations.repositories.workflowInvocations REST resource reference.

For example, to only execute actions with the hourly tag with all the transitive dependencies included, replace the createWorkflowInvocation body with the following code snippet:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

You can also pass additional settings as runtime arguments in a Workflows run request and access those arguments using variables. For more information, see Pass runtime arguments in an execution request.

Schedule runs with Cloud Composer

You can use Cloud Composer 2 to schedule Dataform runs. Dataform does not support Cloud Composer 1.

To manage schedules for Dataform runs with Cloud Composer 2, you can use Dataform operators in Airflow Directed Acyclic Graphs (DAGs). You can create an Airflow DAG that schedules Dataform workflow invocations.

Dataform provides various Airflow operators. These include operators for getting a compilation result, getting a workflow invocation, and canceling a workflow invocation. To view the complete list of available Dataform Airflow operators, see Google Dataform Operators.

Install the google-cloud-dataform PyPi package

If you use Cloud Composer 2 versions 2.0.25 and later, this package is preinstalled in your environment. You don't need to install it.

If you use earlier versions of Cloud Composer 2, install the google-cloud-dataform PyPi package.

In the PyPI packages section, specify version ==0.2.0.

Create an Airflow DAG that schedules Dataform workflow invocations

To manage scheduled runs of Dataform workflows with Cloud Composer 2, write the DAG using Dataform Airflow operators, then upload it to your environment's bucket.

The following code sample shows an Airflow DAG that creates a Dataform compilation result and starts a Dataform workflow invocation:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud project ID.
  • REPOSITORY_ID: the name of your Dataform repository.
  • REGION: the region in which the Dataform repository is located.
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation.
  • GIT_COMMITISH: the Git commitish in the remote Git repository of the version of your code that you want to use—for example, a branch or a Git SHA.

The following code sample shows an Airflow DAG that performs the following:

  1. Creates a Dataform compilation result.
  2. Starts an asynchronous Dataform workflow invocation.
  3. Polls the status of your workflow until it enters the expected state by using DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Replace the following:

  • PROJECT_ID: your Dataform Google Cloud projectID.
  • REPOSITORY_ID: the name of your Dataform repository.
  • REGION: the region in which the Dataform repository is located.
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation.
  • GIT_COMMITISH: the Git commitish in the remote Git repository of the version of your code that you want to use—for example, a branch or a Git SHA.
  • COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation.

Add compilation configuration parameters

You can add additional compilation configuration parameters to the create_compilation_result Airflow DAG object. For more information about available parameters, see the CodeCompilationConfig Dataform API reference.

  • To add compilation configuration parameters to the create_compilation_result Airflow DAG object, add your selected parameters to the code_compilation_config field in the following format:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Replace the following:

    • PROJECT_ID: your Dataform Google Cloud project ID.
    • REPOSITORY_ID: the name of your Dataform repository.
    • REGION: the region in which the Dataform repository is located.
    • GIT_COMMITISH: the Git commitish in the remote Git repository of the version of your code that you want to use—for example, a branch or a Git SHA.
    • PARAMETER: the selected CodeCompilationConfig parameter. You can add multiple parameters.
    • PARAMETER_VALUE: the value of the selected parameter.

The following code sample shows the defaultDatabase parameter added to the create_compilation_result Airflow DAG object:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Add workflow invocation configuration parameters

You can add additional workflow invocation configuration parameters to the create_workflow_invocation Airflow DAG object. For more information about available parameters, see the InvocationConfig Dataform API reference.

  • To add workflow invocation configuration parameters to the create_workflow_invocation Airflow DAG object, add your selected parameters to the invocation_config field in the following format:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Replace the following:

    • PROJECT_ID: your Dataform Google Cloud project ID.
    • REPOSITORY_ID: the name of your Dataform repository.
    • REGION: the region in which the Dataform repository is located.
    • PARAMETER: the selected InvocationConfig parameter. You can add multiple parameters.
    • PARAMETER_VALUE: the value of the selected parameter.

The following code sample shows the includedTags[] and transitiveDependenciesIncluded parameters added to the create_workflow_invocation Airflow DAG object:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

What's next