You can use Cloud Composer 2 to schedule Dataform executions. Dataform does not support Cloud Composer 1.
To manage schedules for Dataform executions with Cloud Composer 2, you can use Dataform operators in Airflow Directed Acyclic Graphs (DAGs). You can create an Airflow DAG that schedules Dataform workflow invocations.
Dataform provides various Airflow operators. These include operators for getting a compilation result, getting a workflow invocation, and canceling a workflow invocation. To view the complete list of available Dataform Airflow operators, see Google Dataform Operators.
Before you begin
- Select or create a Dataform repository.
- Grant Dataform access to BigQuery.
- Select or create a Dataform workspace.
- Create at least one table.
- Create a Cloud Composer 2 environment.
- Grant the roles/composer.worker and roles/dataform.editor roles to your Cloud Composer environment's service account.
Install the google-cloud-dataform
PyPi package
If you use Cloud Composer 2 versions 2.0.25
and later, this package
is preinstalled in your environment. You don't need to install it.
If you use earlier versions of Cloud Composer 2,
install the google-cloud-dataform
PyPi package.
In the PyPI packages section, specify version ==0.2.0
.
Create an Airflow DAG that schedules Dataform workflow invocations
To manage scheduled executions of Dataform SQL workflows with Cloud Composer 2, write the DAG using Dataform Airflow operators, then upload it to your environment's bucket.
The following code sample shows an Airflow DAG that creates a Dataform compilation result and starts a Dataform workflow invocation:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Replace the following:
- PROJECT_ID: your Dataform Google Cloud Project ID
- REPOSITORY_ID: the name of your Dataform repository
- REGION: the region in which the Dataform repository is located
- COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation
- GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use
The following code sample shows an Airflow DAG that:
- Creates a Dataform compilation result.
- Starts an asynchronous Dataform workflow invocation.
- Polls the status of your workflow until it enters the expected state
by using
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Replace the following:
- PROJECT_ID: your Dataform Google Cloud Project ID
- REPOSITORY_ID: the name of your Dataform repository
- REGION: the region in which the Dataform repository is located
- COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation
- GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use
- COMPILATION_RESULT: the name of the compilation result you want to use for this workflow invocation
Add compilation configuration parameters
You can add additional compilation configuration parameters to the
create_compilation_result
Airflow DAG object. For more information about
available parameters, see the CodeCompilationConfig
Dataform API reference.
- To add compilation configuration parameters to the
create_compilation_result
Airflow DAG object, add your selected parameters tocode_compilation_config
in the following format:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Replace the following:
- PROJECT_ID: your Dataform Google Cloud Project ID
- REPOSITORY_ID: the name of your Dataform repository
- REGION with the region in which the Dataform repository is located
- GIT_COMMITISH: the Git commitish, for example, a branch or a Git SHA, in your remote Git repository of the version of your code that you want to use
- PARAMETER: selected
CodeCompilationConfig
parameter. You can add multiple parameters. - PARAMETER_VALUE: value of the selected parameter
The following code sample shows the defaultDatabase
parameter added to the
create_compilation_result
Airflow DAG object:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Add workflow invocation configuration parameters
You can add additional workflow invocation configuration parameters to the
create_workflow_invocation
Airflow DAG object. For more information about
available parameters, see the InvocationConfig
Dataform API reference.
- To add workflow invocation configuration parameters to the
create_workflow_invocation
Airflow DAG object, add your selected parameters toinvocation_config
in the following format:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Replace the following:
- PROJECT_ID: your Dataform Google Cloud Project ID
- REPOSITORY_ID: the name of your Dataform repository
- REGION: the region in which the Dataform repository is located
- PARAMETER: selected
InvocationConfig
parameter. You can add multiple parameters. - PARAMETER_VALUE: value of the selected parameter
The following code sample shows the includedTags[]
and
transitiveDependenciesIncluded
parameters added to the
create_workflow_invocation
Airflow DAG object:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
What's next
- To learn how to configure compilation overrides for workflow executions, see Configure compilation overrides.
- To learn more about the Dataform API, see Dataform API.
- To learn more about Cloud Composer environments, see Overview of Cloud Composer.
- To learn how to schedule executions with Workflows and Cloud Scheduler, see Schedule executions with Workflows and Cloud Scheduler.