En este documento, se muestra cómo hacer lo siguiente en Dataform:
- Programa ejecuciones con configuraciones de flujos de trabajo.
- Programa ejecuciones con flujos de trabajo y Cloud Scheduler.
- Programa ejecuciones con Cloud Composer.
Antes de comenzar
Para programar ejecuciones con configuraciones de flujo de trabajo o programar ejecuciones con flujos de trabajo y Cloud Scheduler, asegúrate de hacer lo siguiente:
En la consola de Google Cloud, ve a la página Dataform.
Selecciona o crea un repositorio.
Crea una configuración de lanzamiento.
Para programar ejecuciones con Cloud Composer, asegúrate de hacer lo siguiente:
- Selecciona o crea un repositorio de Dataform.
- Otorgar acceso de Dataform a BigQuery.
- Selecciona o crea un espacio de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
Roles obligatorios
Para obtener los permisos que necesitas para completar las tareas de este documento, pídele a tu administrador que te otorgue los siguientes roles de IAM:
-
Administrador de Dataform (
roles/dataform.admin
) en los repositorios -
Editor de Dataform (
roles/dataform.editor
) en repositorios y la cuenta de servicio de tu entorno de Cloud Composer -
Trabajador de Composer (
roles/composer.worker
) en la cuenta de servicio del entorno de Cloud Composer
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.
Para usar una cuenta de servicio distinta de la cuenta de servicio predeterminada de Dataform, otorga acceso a la cuenta de servicio personalizada.
Programa ejecuciones con configuraciones de flujos de trabajo
En esta sección, se muestra cómo crear una configuración de flujo de trabajo en Dataform para programar y configurar ejecuciones de flujos de trabajo. Puedes usar configuraciones de flujo de trabajo para ejecutar flujos de trabajo de Dataform de manera programada.
Información acerca de las configuraciones de flujo de trabajo
Para programar ejecuciones de Dataform de todas las acciones del flujo de trabajo o de las seleccionadas en BigQuery, puedes crear parámetros de configuración del flujo de trabajo. En una configuración de flujo de trabajo, seleccionas una configuración de lanzamiento de compilación, seleccionas acciones de flujo de trabajo para la ejecución y estableces el programa de ejecución.
Luego, durante una ejecución programada de la configuración de tu flujo de trabajo, Dataform implementa en BigQuery la selección de acciones del resultado de compilación más reciente de tu configuración de lanzamiento. También puedes activar manualmente la ejecución de una configuración de flujo de trabajo con workflowConfigs de la API de Dataform.
Una configuración de flujo de trabajo de Dataform contiene la siguiente configuración de ejecución:
- Es el ID de la configuración del flujo de trabajo.
- Configuración de lanzamiento.
Cuenta de servicio.
Esta es la cuenta de servicio asociada con la configuración del flujo de trabajo. Puedes seleccionar la cuenta de servicio predeterminada de Dataform o una cuenta de servicio asociada con tu proyecto de Google Cloud, o bien ingresar manualmente una cuenta de servicio diferente. De forma predeterminada, las configuraciones de flujo de trabajo usan las mismas cuentas de servicio que sus repositorios.
Acciones del flujo de trabajo que se ejecutarán:
- Todas las acciones.
- Selección de acciones.
- Selección de etiquetas.
Programa de ejecución y zona horaria
Crea una configuración de flujo de trabajo
Para crear una configuración de flujo de trabajo de Dataform, sigue estos pasos:
- En el repositorio, ve a Lanzamientos y programación.
- En la sección Configuraciones de flujo de trabajo, haz clic en Crear.
En el panel Crear configuración de flujo de trabajo, en el campo ID de configuración, ingresa un ID único para la configuración del flujo de trabajo.
Los IDs solo pueden incluir números, letras, guiones y guiones bajos.
En el menú Configuración de lanzamiento, selecciona una configuración de lanzamiento de compilación.
Opcional: En el campo Frecuencia, ingresa la frecuencia de las ejecuciones en el formato unix-cron.
Para asegurarte de que Dataform ejecute el resultado de compilación más reciente en la configuración de lanzamiento correspondiente, mantén un descanso mínimo de una hora entre el momento de la creación del resultado de compilación y el momento de la ejecución programada.
En el menú Cuenta de servicio, selecciona una cuenta de servicio para la configuración del flujo de trabajo.
En el menú, puedes seleccionar la cuenta de servicio predeterminada de Dataform o cualquier cuenta de servicio asociada con tu proyecto de Google Cloud al que tengas acceso. Si no seleccionas una cuenta de servicio, la configuración del flujo de trabajo usa la cuenta de servicio del repositorio.
Opcional: En el menú Zona horaria, selecciona la zona horaria para las ejecuciones.
La zona horaria predeterminada es UTC.
Selecciona las acciones del flujo de trabajo que se ejecutarán:
- Para ejecutar todo el flujo de trabajo, haz clic en Todas las acciones.
- Para ejecutar las acciones seleccionadas en el flujo de trabajo, haz clic en Selección de acciones y, luego, selecciona las acciones.
- Para ejecutar acciones con etiquetas seleccionadas, haz clic en Selección de etiquetas y, luego, selecciona las etiquetas.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para volver a compilar todas las tablas desde cero, selecciona la opción Run with full refresh.
Sin esta opción, Dataform actualiza las tablas incrementales sin volver a compilarlas desde cero.
Haz clic en Crear.
Por ejemplo, la siguiente configuración de flujo de trabajo ejecuta acciones con la etiqueta hourly
cada hora en la zona horaria CEST:
- ID de configuración:
production-hourly
- Configuración de lanzamiento: -
- Frecuencia:
0 * * * *
- Zona horaria:
Central European Summer Time (CEST)
- Selección de acciones de flujo de trabajo: selección de etiquetas, etiqueta
hourly
Edita la configuración de un flujo de trabajo
Para editar la configuración de un flujo de trabajo, sigue estos pasos:
- En el repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas editar, haz clic en el menú Más y, luego, en Editar.
- En el panel Editar configuración del flujo de trabajo, edita la configuración de la configuración de lanzamiento y, luego, haz clic en Guardar.
Borra la configuración de un flujo de trabajo
Para borrar la configuración de un flujo de trabajo, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas borrar, haz clic en el menú Más y, luego, en Borrar.
- En el diálogo Borrar configuración de lanzamiento, haz clic en Borrar.
Programa ejecuciones con Workflows y Cloud Scheduler
En esta sección, se muestra cómo programar ejecuciones de flujos de trabajo de Dataform con Workflows y Cloud Scheduler.
Acerca de las ejecuciones de flujos de trabajo programados
Puedes configurar la frecuencia de las ejecuciones de flujo de trabajo de Dataform si creas un trabajo de Cloud Scheduler que active un flujo de trabajo de Workflows. Workflows ejecuta servicios en un flujo de trabajo de organización que tú defines.
Workflows ejecuta tu flujo de trabajo de Dataform en un proceso de dos pasos. Primero, extrae el código del repositorio de Dataform de tu proveedor de Git y lo compila en un resultado de compilación. Luego, usa el resultado de la compilación para crear un flujo de trabajo de Dataform y lo ejecuta con la frecuencia que establezcas.
Crea un flujo de trabajo de orquestación programado
Para programar ejecuciones de tu flujo de trabajo de Dataform, usa Workflows para crear un flujo de trabajo de orquestación y agregar un trabajo de Cloud Scheduler como activador.
Workflows usa cuentas de servicio para otorgar acceso a los recursos deGoogle Cloud a los flujos de trabajo. Crea una cuenta de servicio y bríndale el rol de Identity and Access Management de editor de Dataform (
roles/dataform.editor
), así como los permisos mínimos necesarios para administrar tu flujo de trabajo de orquestación. Para obtener más información, consulta Otorga permiso a un flujo de trabajo para acceder a los recursos de Google Cloud .Crea un flujo de trabajo de orquestación y usa el siguiente código fuente de YAML como definición de flujo de trabajo:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu Google Cloud proyecto.
- REPOSITORY_LOCATION: Es la ubicación de tu repositorio de Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- GIT_COMMITISH: Es la rama de Git desde la que deseas ejecutar el código de Dataform. Para un repositorio recién creado, reemplázalo por
main
.
Programa el flujo de trabajo de orquestación con Cloud Scheduler.
Personaliza la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform
Puedes
actualizar el flujo de trabajo de orquestación existente
y definir la configuración de la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform
en el formato YAML. Para obtener más información sobre la configuración, consulta la referencia de recursos REST de projects.locations.repositories.compilationResults
.
Por ejemplo, para agregar una configuración schemaSuffix
_dev
a todas las acciones durante la compilación, reemplaza el cuerpo del paso createCompilationResult
por el siguiente fragmento de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Personaliza la solicitud de invocación del flujo de trabajo de Dataform
Puedes
actualizar el flujo de trabajo de orquestación existente
y definir la configuración de la solicitud de invocación del flujo de trabajo de Dataform en
el formato YAML. Para obtener más información sobre la configuración de la solicitud de invocación, consulta la referencia de recursos REST de projects.locations.repositories.workflowInvocations
.
Por ejemplo, para ejecutar solo acciones con la etiqueta hourly
con todas las dependencias transitivas incluidas, reemplaza el cuerpo createWorkflowInvocation
por el siguiente fragmento de código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Programa ejecuciones con Cloud Composer
Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.
Para administrar las programaciones de las ejecuciones de Dataform con Cloud Composer 2, puedes usar operadores de Dataform en los grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow. Estos incluyen operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de operadores de Dataform Airflow disponibles, consulta Operadores de Google Dataform.
Instala el paquete google-cloud-dataform
de PyPI
Si usas las versiones 2.0.25
y posteriores de Cloud Composer 2, este paquete está preinstalado en tu entorno. No es necesario que la instales.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform
de PyPI.
En la sección de paquetes de PyPI, especifica la versión ==0.2.0
.
Crea un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform
Para administrar ejecuciones programadas de flujos de trabajo de Dataform con Cloud Composer 2, escribe el DAG con operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.
En la siguiente muestra de código, se muestra un DAG de Airflow que crea un resultado de compilación de Dataform y comienza una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
En la siguiente muestra de código, se muestra un DAG de Airflow que realiza las siguientes acciones:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación asíncrona del flujo de trabajo de Dataform.
- Consulta el estado de tu flujo de trabajo hasta que ingrese al estado esperado con
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud de Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
Agrega parámetros de configuración de compilación
Puedes agregar parámetros de configuración de compilación adicionales al objeto DAG de create_compilation_result
de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig
.
Para agregar parámetros de configuración de compilación al objeto DAG de Airflow
create_compilation_result
, agrega los parámetros seleccionados al campocode_compilation_config
en el siguiente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
- PARAMETER: Es el parámetro
CodeCompilationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En la siguiente muestra de código, se muestra el parámetro defaultDatabase
agregado al objeto DAG create_compilation_result
de Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Agrega parámetros de configuración de invocación de flujo de trabajo
Puedes agregar parámetros de configuración de invocación de flujo de trabajo adicionales al objeto DAG de create_workflow_invocation
de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig
.
Para agregar parámetros de configuración de invocación de flujo de trabajo al objeto DAG de
create_workflow_invocation
Airflow, agrega los parámetros seleccionados al campoinvocation_config
en el siguiente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- PARAMETER: Es el parámetro
InvocationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En la siguiente muestra de código, se muestran los parámetros includedTags[]
y transitiveDependenciesIncluded
agregados al objeto DAG de Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
¿Qué sigue?
- Para aprender a configurar las configuraciones de lanzamiento de compilación de Dataform, consulta Cómo crear una configuración de lanzamiento.
- Para obtener más información sobre el ciclo de vida del código de Dataform, consulta Introducción al ciclo de vida del código en Dataform.
- Para obtener más información sobre la API de Dataform, consulta la API de Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta la Descripción general de Cloud Composer.
- Para obtener más información sobre los precios de Workflows, consulta Precios de Workflows.