Acerca de la ejecución programada de flujos de trabajo con Cloud Composer 2
Para administrar los programas de ejecuciones de Dataform con Cloud Composer 2, puedes usar los operadores de Dataform en DAG de Airflow. Puedes crear un DAG de Airflow que programe invocaciones del flujo de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow, por ejemplo, los operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo o cancelar una invocación de flujo de trabajo. Para ver la lista completa de los operadores de Dataform Airflow disponibles, consulta Operadores de Dataform de Google.
Antes de comenzar
- Selecciona o crea un repositorio de Dataform.
- Otorga a Dataform acceso a BigQuery.
- Selecciona o crea un lugar de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
- Otorga las funciones roles/composer.worker y roles/dataform.editor a tu cuenta de servicio del entorno de Cloud Composer.
Instala el paquete de PyPi google-cloud-dataform
Si usas las versiones 2.0.25
y posteriores de Cloud Composer 2, este paquete está preinstalado en tu entorno. No es necesario que la instales.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete PyPi google-cloud-dataform
.
En la sección Paquetes de PyPI, especifica la versión ==0.2.0
.
Crear un DAG de Airflow que programe invocaciones del flujo de trabajo de Dataform
Para administrar ejecuciones programadas de flujos de trabajo de SQL de Dataform con Cloud Composer 2, escribe el DAG mediante los operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.
En la siguiente muestra de código, se muestra un DAG de Airflow que crea un resultado de compilación de Dataform y, luego, inicia una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID por el ID del proyecto de Google Cloud de Dataform.
- REPOSITORY_ID por el nombre de tu repositorio de Dataform.
- REGION por la región en la que se encuentra el repositorio de Dataform
- COMPILATION_RESULT por el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH por el compromiso de Git, por ejemplo, una rama o un SHA de Git, en el repositorio remoto de Git de la versión del código que deseas usar.
En la siguiente muestra de código, se muestra un DAG de Airflow que realiza las siguientes acciones:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación de flujo de trabajo asíncrona de Dataform.
- Sondea el estado de tu flujo de trabajo hasta que entre en un estado deseado mediante
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID por el ID del proyecto de Google Cloud de Dataform.
- REPOSITORY_ID por el nombre de tu repositorio de Dataform.
- REGION por la región en la que se encuentra el repositorio de Dataform
- COMPILATION_RESULT por el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH por el compromiso de Git, por ejemplo, una rama o un SHA de Git, en el repositorio de Git remoto de la versión del código que quieres usar.
- COMPILATION_RESULT por el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
Cómo agregar parámetros de configuración de compilación
Puedes agregar parámetros de configuración de compilación adicionales al
objeto create_compilation_result
del DAG de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig
.
- Para agregar parámetros de configuración de compilación al objeto DAG
de Airflow
create_compilation_result
, agrega los parámetros seleccionados acode_compilation_config
en el siguiente formato:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Reemplaza lo siguiente:
- PROJECT_ID por el ID del proyecto de Google Cloud de Dataform.
- REPOSITORY_ID por el nombre de tu repositorio de Dataform.
- REGION por la región en la que se encuentra el repositorio de Dataform
- GIT_COMMITISH por el compromiso de Git, por ejemplo, una rama o un SHA de Git, en el repositorio de Git remoto de la versión del código que quieres usar.
- PARAMETER con un parámetro
CodeCompilationConfig
seleccionado. Puede agregar varios parámetros. - PARAMETER_VALUE por un valor del parámetro seleccionado.
En la siguiente muestra de código, se muestra el parámetro defaultDatabase
agregado al objeto create_compilation_result
del DAG de Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Agrega parámetros de configuración de invocación de flujos de trabajo
Puedes agregar parámetros de configuración de invocación del flujo de trabajo adicionales al objeto create_workflow_invocation
del DAG de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig
.
- Para agregar parámetros de configuración de invocación del flujo de trabajo al
objeto DAG
create_workflow_invocation
de Airflow, agrega los parámetros seleccionados ainvocation_config
en el siguiente formato:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Reemplaza lo siguiente:
- PROJECT_ID por el ID del proyecto de Google Cloud de Dataform.
- REPOSITORY_ID por el nombre de tu repositorio de Dataform.
- REGION por la región en la que se encuentra el repositorio de Dataform
- PARAMETER con un parámetro
InvocationConfig
seleccionado. Puede agregar varios parámetros. - PARAMETER_VALUE por un valor del parámetro seleccionado.
En la siguiente muestra de código, se muestran los parámetros includedTags[]
y transitiveDependenciesIncluded
que se agregaron al objeto create_workflow_invocation
del DAG de Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
¿Qué sigue?
- Si quieres obtener información para configurar anulaciones de compilación para ejecuciones del flujo de trabajo, consulta Cómo configurar anulaciones de compilación.
- Para obtener más información sobre la API de Dataform, consulta API de Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta Descripción general de Cloud Composer.
- Para aprender a programar ejecuciones con Workflows y Cloud Scheduler, consulta Programa ejecuciones con Workflows y Cloud Scheduler.