Programar ejecuciones

En este documento, se muestra cómo hacer lo siguiente en Dataform:

Antes de comenzar

Para programar ejecuciones con configuraciones de flujo de trabajo o programar ejecuciones con flujos de trabajo y Cloud Scheduler, asegúrate de hacer lo siguiente:

  1. En la consola de Google Cloud, ve a la página Dataform.

    Ir a Dataform

  2. Selecciona o crea un repositorio.

  3. Crea una configuración de lanzamiento.

Para programar ejecuciones con Cloud Composer, asegúrate de hacer lo siguiente:

  1. Selecciona o crea un repositorio de Dataform.
  2. Otorgar acceso de Dataform a BigQuery.
  3. Selecciona o crea un espacio de trabajo de Dataform.
  4. Crea al menos una tabla.
  5. Crea un entorno de Cloud Composer 2.

Roles obligatorios

Para obtener los permisos que necesitas para completar las tareas de este documento, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Para usar una cuenta de servicio distinta de la cuenta de servicio predeterminada de Dataform, otorga acceso a la cuenta de servicio personalizada.

Programa ejecuciones con configuraciones de flujos de trabajo

En esta sección, se muestra cómo crear una configuración de flujo de trabajo en Dataform para programar y configurar ejecuciones de flujos de trabajo. Puedes usar configuraciones de flujo de trabajo para ejecutar flujos de trabajo de Dataform de manera programada.

Información acerca de las configuraciones de flujo de trabajo

Para programar ejecuciones de Dataform de todas las acciones del flujo de trabajo o de las seleccionadas en BigQuery, puedes crear parámetros de configuración del flujo de trabajo. En una configuración de flujo de trabajo, seleccionas una configuración de lanzamiento de compilación, seleccionas acciones de flujo de trabajo para la ejecución y estableces el programa de ejecución.

Luego, durante una ejecución programada de la configuración de tu flujo de trabajo, Dataform implementa en BigQuery la selección de acciones del resultado de compilación más reciente de tu configuración de lanzamiento. También puedes activar manualmente la ejecución de una configuración de flujo de trabajo con workflowConfigs de la API de Dataform.

Una configuración de flujo de trabajo de Dataform contiene la siguiente configuración de ejecución:

  • Es el ID de la configuración del flujo de trabajo.
  • Configuración de lanzamiento.
  • Cuenta de servicio.

    Esta es la cuenta de servicio asociada con la configuración del flujo de trabajo. Puedes seleccionar la cuenta de servicio predeterminada de Dataform o una cuenta de servicio asociada con tu proyecto de Google Cloud, o bien ingresar manualmente una cuenta de servicio diferente. De forma predeterminada, las configuraciones de flujo de trabajo usan las mismas cuentas de servicio que sus repositorios.

  • Acciones del flujo de trabajo que se ejecutarán:

    • Todas las acciones.
    • Selección de acciones.
    • Selección de etiquetas.
  • Programa de ejecución y zona horaria

Crea una configuración de flujo de trabajo

Para crear una configuración de flujo de trabajo de Dataform, sigue estos pasos:

  1. En el repositorio, ve a Lanzamientos y programación.
  2. En la sección Configuraciones de flujo de trabajo, haz clic en Crear.
  3. En el panel Crear configuración de flujo de trabajo, en el campo ID de configuración, ingresa un ID único para la configuración del flujo de trabajo.

    Los IDs solo pueden incluir números, letras, guiones y guiones bajos.

  4. En el menú Configuración de lanzamiento, selecciona una configuración de lanzamiento de compilación.

  5. Opcional: En el campo Frecuencia, ingresa la frecuencia de las ejecuciones en el formato unix-cron.

    Para asegurarte de que Dataform ejecute el resultado de compilación más reciente en la configuración de lanzamiento correspondiente, mantén un descanso mínimo de una hora entre el momento de la creación del resultado de compilación y el momento de la ejecución programada.

  6. En el menú Cuenta de servicio, selecciona una cuenta de servicio para la configuración del flujo de trabajo.

    En el menú, puedes seleccionar la cuenta de servicio predeterminada de Dataform o cualquier cuenta de servicio asociada con tu proyecto de Google Cloud al que tengas acceso. Si no seleccionas una cuenta de servicio, la configuración del flujo de trabajo usa la cuenta de servicio del repositorio.

  7. Opcional: En el menú Zona horaria, selecciona la zona horaria para las ejecuciones.

    La zona horaria predeterminada es UTC.

  8. Selecciona las acciones del flujo de trabajo que se ejecutarán:

    • Para ejecutar todo el flujo de trabajo, haz clic en Todas las acciones.
    • Para ejecutar las acciones seleccionadas en el flujo de trabajo, haz clic en Selección de acciones y, luego, selecciona las acciones.
    • Para ejecutar acciones con etiquetas seleccionadas, haz clic en Selección de etiquetas y, luego, selecciona las etiquetas.
    • Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
    • Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
    • Opcional: Para volver a compilar todas las tablas desde cero, selecciona la opción Run with full refresh.

    Sin esta opción, Dataform actualiza las tablas incrementales sin volver a compilarlas desde cero.

  9. Haz clic en Crear.

Por ejemplo, la siguiente configuración de flujo de trabajo ejecuta acciones con la etiqueta hourly cada hora en la zona horaria CEST:

  • ID de configuración: production-hourly
  • Configuración de lanzamiento: -
  • Frecuencia: 0 * * * *
  • Zona horaria: Central European Summer Time (CEST)
  • Selección de acciones de flujo de trabajo: selección de etiquetas, etiqueta hourly

Edita la configuración de un flujo de trabajo

Para editar la configuración de un flujo de trabajo, sigue estos pasos:

  1. En el repositorio, ve a Lanzamientos y programación.
  2. En la configuración del flujo de trabajo que deseas editar, haz clic en el menú Más y, luego, en Editar.
  3. En el panel Editar configuración del flujo de trabajo, edita la configuración de la configuración de lanzamiento y, luego, haz clic en Guardar.

Borra la configuración de un flujo de trabajo

Para borrar la configuración de un flujo de trabajo, sigue estos pasos:

  1. En tu repositorio, ve a Lanzamientos y programación.
  2. En la configuración del flujo de trabajo que deseas borrar, haz clic en el menú Más y, luego, en Borrar.
  3. En el diálogo Borrar configuración de lanzamiento, haz clic en Borrar.

Programa ejecuciones con Workflows y Cloud Scheduler

En esta sección, se muestra cómo programar ejecuciones de flujos de trabajo de Dataform con Workflows y Cloud Scheduler.

Acerca de las ejecuciones de flujos de trabajo programados

Puedes configurar la frecuencia de las ejecuciones de flujo de trabajo de Dataform si creas un trabajo de Cloud Scheduler que active un flujo de trabajo de Workflows. Workflows ejecuta servicios en un flujo de trabajo de organización que tú defines.

Workflows ejecuta tu flujo de trabajo de Dataform en un proceso de dos pasos. Primero, extrae el código del repositorio de Dataform de tu proveedor de Git y lo compila en un resultado de compilación. Luego, usa el resultado de la compilación para crear un flujo de trabajo de Dataform y lo ejecuta con la frecuencia que establezcas.

Crea un flujo de trabajo de orquestación programado

Para programar ejecuciones de tu flujo de trabajo de Dataform, usa Workflows para crear un flujo de trabajo de orquestación y agregar un trabajo de Cloud Scheduler como activador.

  1. Workflows usa cuentas de servicio para otorgar acceso a los recursos deGoogle Cloud a los flujos de trabajo. Crea una cuenta de servicio y bríndale el rol de Identity and Access Management de editor de Dataform (roles/dataform.editor), así como los permisos mínimos necesarios para administrar tu flujo de trabajo de orquestación. Para obtener más información, consulta Otorga permiso a un flujo de trabajo para acceder a los recursos de Google Cloud .

  2. Crea un flujo de trabajo de orquestación y usa el siguiente código fuente de YAML como definición de flujo de trabajo:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID de tu Google Cloud proyecto.
    • REPOSITORY_LOCATION: Es la ubicación de tu repositorio de Dataform.
    • REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
    • GIT_COMMITISH: Es la rama de Git desde la que deseas ejecutar el código de Dataform. Para un repositorio recién creado, reemplázalo por main.
  3. Programa el flujo de trabajo de orquestación con Cloud Scheduler.

Personaliza la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform

Puedes actualizar el flujo de trabajo de orquestación existente y definir la configuración de la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform en el formato YAML. Para obtener más información sobre la configuración, consulta la referencia de recursos REST de projects.locations.repositories.compilationResults.

Por ejemplo, para agregar una configuración schemaSuffix _dev a todas las acciones durante la compilación, reemplaza el cuerpo del paso createCompilationResult por el siguiente fragmento de código:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.

Personaliza la solicitud de invocación del flujo de trabajo de Dataform

Puedes actualizar el flujo de trabajo de orquestación existente y definir la configuración de la solicitud de invocación del flujo de trabajo de Dataform en el formato YAML. Para obtener más información sobre la configuración de la solicitud de invocación, consulta la referencia de recursos REST de projects.locations.repositories.workflowInvocations.

Por ejemplo, para ejecutar solo acciones con la etiqueta hourly con todas las dependencias transitivas incluidas, reemplaza el cuerpo createWorkflowInvocation por el siguiente fragmento de código:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.

Programa ejecuciones con Cloud Composer

Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.

Para administrar las programaciones de las ejecuciones de Dataform con Cloud Composer 2, puedes usar operadores de Dataform en los grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform.

Dataform proporciona varios operadores de Airflow. Estos incluyen operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de operadores de Dataform Airflow disponibles, consulta Operadores de Google Dataform.

Instala el paquete google-cloud-dataform de PyPI

Si usas las versiones 2.0.25 y posteriores de Cloud Composer 2, este paquete está preinstalado en tu entorno. No es necesario que la instales.

Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform de PyPI.

En la sección de paquetes de PyPI, especifica la versión ==0.2.0.

Crea un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform

Para administrar ejecuciones programadas de flujos de trabajo de Dataform con Cloud Composer 2, escribe el DAG con operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.

En la siguiente muestra de código, se muestra un DAG de Airflow que crea un resultado de compilación de Dataform y comienza una invocación de flujo de trabajo de Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
  • REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
  • REGION: La región en la que se encuentra el repositorio de Dataform.
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
  • GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.

En la siguiente muestra de código, se muestra un DAG de Airflow que realiza las siguientes acciones:

  1. Crea un resultado de compilación de Dataform.
  2. Inicia una invocación asíncrona del flujo de trabajo de Dataform.
  3. Consulta el estado de tu flujo de trabajo hasta que ingrese al estado esperado con DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Reemplaza lo siguiente:

  • PROJECT_ID: El ID de tu proyecto de Google Cloud de Dataform.
  • REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
  • REGION: La región en la que se encuentra el repositorio de Dataform.
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
  • GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.

Agrega parámetros de configuración de compilación

Puedes agregar parámetros de configuración de compilación adicionales al objeto DAG de create_compilation_result de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig.

  • Para agregar parámetros de configuración de compilación al objeto DAG de Airflow create_compilation_result, agrega los parámetros seleccionados al campo code_compilation_config en el siguiente formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
    • REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
    • REGION: La región en la que se encuentra el repositorio de Dataform.
    • GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
    • PARAMETER: Es el parámetro CodeCompilationConfig seleccionado. Puedes agregar varios parámetros.
    • PARAMETER_VALUE: Es el valor del parámetro seleccionado.

En la siguiente muestra de código, se muestra el parámetro defaultDatabase agregado al objeto DAG create_compilation_result de Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Agrega parámetros de configuración de invocación de flujo de trabajo

Puedes agregar parámetros de configuración de invocación de flujo de trabajo adicionales al objeto DAG de create_workflow_invocation de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig.

  • Para agregar parámetros de configuración de invocación de flujo de trabajo al objeto DAG de create_workflow_invocation Airflow, agrega los parámetros seleccionados al campo invocation_config en el siguiente formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID de tu proyecto de Dataform en Google Cloud.
    • REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
    • REGION: La región en la que se encuentra el repositorio de Dataform.
    • PARAMETER: Es el parámetro InvocationConfig seleccionado. Puedes agregar varios parámetros.
    • PARAMETER_VALUE: Es el valor del parámetro seleccionado.

En la siguiente muestra de código, se muestran los parámetros includedTags[] y transitiveDependenciesIncluded agregados al objeto DAG de Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

¿Qué sigue?