Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.
Para administrar los programas de las ejecuciones de Dataform con Cloud Composer 2, puedes usar los operadores de Dataform en los grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones del flujo de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow. Esto incluye operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de operadores de Dataform disponibles en Airflow, consulta Operadores de Dataform de Google.
Antes de comenzar
- Selecciona o crea un repositorio de Dataform.
- Otorga acceso a BigQuery mediante Dataform.
- Selecciona o crea un lugar de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
- Otorga los roles roles/composer.worker y roles/dataform.editor a la cuenta de servicio del entorno de Cloud Composer.
Instala el paquete google-cloud-dataform
de PyPi
Si usas la versión 2.0.25
de Cloud Composer 2 y versiones posteriores, este paquete estará preinstalado en tu entorno. No es necesario que la instales.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform
de PyPi.
En la sección de paquetes de PyPI, especifica la versión ==0.2.0
.
Crea un DAG de Airflow que programe invocaciones del flujo de trabajo de Dataform
Para administrar las ejecuciones programadas de los flujos de trabajo de SQL de Dataform con Cloud Composer 2, escribe el DAG con los operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.
En la siguiente muestra de código, se ve un DAG de Airflow que crea un resultado de compilación de Dataform y, luego, inicia una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: El ID del proyecto de Google Cloud de Dataform
- REPOSITORY_ID: Es el nombre del repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El confirmish de Git (por ejemplo, una rama o un SHA de Git) en tu repositorio de Git remoto de la versión del código que deseas usar
En la siguiente muestra de código, se observa un DAG de Airflow que realiza las siguientes acciones:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación asíncrona de flujo de trabajo de Dataform.
- Sondea el estado de tu flujo de trabajo hasta que entra en el estado esperado mediante
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: El ID del proyecto de Google Cloud de Dataform
- REPOSITORY_ID: Es el nombre del repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El confirmish de Git, por ejemplo, una rama o un SHA de Git, en tu repositorio de Git remoto de la versión del código que quieres usar
- COMPILATION_RESULT: Es el nombre del resultado de compilación que deseas usar para esta invocación de flujo de trabajo.
Agrega parámetros de configuración de compilación
Puedes agregar parámetros de configuración de compilación adicionales al objeto DAG de Airflow create_compilation_result
. Para obtener más información sobre
los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig
.
- Para agregar parámetros de configuración de compilación al objeto
create_compilation_result
de DAG de Airflow, agrega los parámetros seleccionados acode_compilation_config
en el siguiente formato:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Reemplaza lo siguiente:
- PROJECT_ID: El ID del proyecto de Google Cloud de Dataform
- REPOSITORY_ID: Es el nombre del repositorio de Dataform.
- REGION por la región en la que se encuentra el repositorio de Dataform
- GIT_COMMITISH: El confirmish de Git, por ejemplo, una rama o un SHA de Git, en tu repositorio de Git remoto de la versión del código que quieres usar
- PARAMETER: Se seleccionó el parámetro
CodeCompilationConfig
. Puedes agregar varios parámetros. - PARAMETER_VALUE: El valor del parámetro seleccionado
En la siguiente muestra de código, se indica el parámetro defaultDatabase
agregado al objeto create_compilation_result
de DAG de Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Agrega parámetros de configuración de invocación de flujos de trabajo
Puedes agregar parámetros de configuración de invocación de flujo de trabajo adicionales al objeto DAG de Airflow create_workflow_invocation
. Para obtener más información sobre
los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig
.
- Para agregar parámetros de configuración de invocación de flujos de trabajo al objeto DAG de Airflow
create_workflow_invocation
, agrega los parámetros seleccionados ainvocation_config
en el siguiente formato:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Reemplaza lo siguiente:
- PROJECT_ID: El ID del proyecto de Google Cloud de Dataform
- REPOSITORY_ID: Es el nombre del repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- PARAMETER: Se seleccionó el parámetro
InvocationConfig
. Puedes agregar varios parámetros. - PARAMETER_VALUE: El valor del parámetro seleccionado
En la siguiente muestra de código, se incluyen los parámetros includedTags[]
y transitiveDependenciesIncluded
que se agregaron al objeto create_workflow_invocation
de DAG de Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
¿Qué sigue?
- Si quieres aprender a configurar anulaciones de compilación para ejecuciones de flujos de trabajo, consulta Cómo configurar anulaciones de compilación.
- Para obtener más información sobre la API de Dataform, consulta API de Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta la Descripción general de Cloud Composer.
- Para obtener información sobre cómo programar ejecuciones con Workflows y Cloud Scheduler, consulta Programa ejecuciones con Workflows y Cloud Scheduler.