Programa ejecuciones con Cloud Composer

En este documento, se muestra cómo ejecutar ejecuciones programadas de Dataform. Flujos de trabajo de SQL con Cloud Composer 2

Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.

Para administrar los programas de ejecuciones de Dataform con Cloud Composer 2, Puedes usar operadores de Dataform en grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe Invocaciones del flujo de trabajo de Dataform.

Dataform proporciona varios operadores de Airflow. Se incluyen los operadores para obtener un resultado de compilación, una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Cómo ver la lista completa de Dataform disponibles Para los operadores de Airflow, consulta Operadores de Dataform de Google.

Antes de comenzar

  1. Selecciona o Crea un repositorio de Dataform.
  2. Otorga acceso de Dataform a BigQuery.
  3. Selecciona o crea un lugar de trabajo de Dataform.
  4. Crea al menos una tabla.
  5. Crea un entorno de Cloud Composer 2.
    1. Otorga el rol roles/composer.worker. y roles/dataform.editor roles a la cuenta de servicio del entorno de Cloud Composer.

Instala el paquete de PyPi google-cloud-dataform

Si usas la versión 2.0.25 de Cloud Composer 2 y posteriores, este paquete ya está preinstalada en tu entorno. No es necesario que la instales.

Si usas versiones anteriores de Cloud Composer 2, Instala el paquete google-cloud-dataform de PyPi.

En la sección de paquetes de PyPI, especifica la versión ==0.2.0.

Crear un DAG de Airflow que programe invocaciones del flujo de trabajo de Dataform

Administrar las ejecuciones programadas de flujos de trabajo de SQL de Dataform con Cloud Composer 2, escribe el DAG con los operadores de Dataform Airflow y, luego, súbelo al bucket de tu entorno.

En la siguiente muestra de código, se muestra un DAG de Airflow que crea un Dataform resultado de la compilación y, luego, inicia una invocación del flujo de trabajo de Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Dataform de Google Cloud.
  • REPOSITORY_ID: Es el nombre del repositorio de Dataform.
  • REGION: Es la región en la que Dataform se ubica el repositorio
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación. que quieres usar para esta invocación de flujo de trabajo
  • GIT_COMMITISH: El confirmish de Git, por ejemplo, una rama o un SHA de Git, en tu repositorio de Git remoto de la versión de el código que quieres usar

En la siguiente muestra de código, se observa un DAG de Airflow que hace lo siguiente:

  1. Crea un resultado de compilación de Dataform.
  2. Inicia una invocación de flujo de trabajo asíncrona de Dataform.
  3. Sondea el estado de tu flujo de trabajo hasta que entra en el estado esperado mediante DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Dataform de Google Cloud.
  • REPOSITORY_ID: Es el nombre del repositorio de Dataform.
  • REGION: Es la región en la que Dataform se ubica el repositorio
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación. que quieres usar para esta invocación de flujo de trabajo
  • GIT_COMMITISH: El confirmish de Git, por ejemplo, una rama o un SHA de Git, en tu repositorio de Git remoto de la versión de tu código que quieres usar
  • COMPILATION_RESULT: Es el nombre del resultado de la compilación. que quieres usar para esta invocación de flujo de trabajo

Agrega parámetros de configuración de compilación

Puedes agregar parámetros de configuración de compilación adicionales al create_compilation_result objeto de DAG de Airflow. Para obtener más información los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig.

  • Para agregar parámetros de configuración de compilación a create_compilation_result Objeto de DAG de Airflow, agrega tus parámetros seleccionados a code_compilation_config en el siguiente formato:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Dataform de Google Cloud.
  • REPOSITORY_ID: Es el nombre del repositorio de Dataform.
  • REGION por la región en la que se encuentra el repositorio de Dataform se encuentra
  • GIT_COMMITISH: El confirmish de Git, por ejemplo, una rama o un SHA de Git, en tu repositorio de Git remoto de la versión de tu código que quieres usar
  • PARAMETER: Se seleccionó el parámetro CodeCompilationConfig. Puedes agregar varios parámetros.
  • PARAMETER_VALUE: valor del parámetro seleccionado

En la siguiente muestra de código, se muestra el parámetro defaultDatabase que se agregó al create_compilation_result Objeto de DAG de Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Agrega parámetros de configuración de invocación del flujo de trabajo

Puedes agregar parámetros de configuración adicionales de invocación del flujo de trabajo al create_workflow_invocation objeto de DAG de Airflow. Para obtener más información los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig.

  • Para agregar parámetros de configuración de invocación de flujo de trabajo al create_workflow_invocation objeto de DAG de Airflow, agrega los parámetros seleccionados a invocation_config en el siguiente formato:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Dataform de Google Cloud.
  • REPOSITORY_ID: Es el nombre del repositorio de Dataform.
  • REGION: Es la región en la que el repositorio de Dataform. se encuentra
  • PARAMETER: Se seleccionó el parámetro InvocationConfig. Puedes agregar varios parámetros.
  • PARAMETER_VALUE: valor del parámetro seleccionado

En la siguiente muestra de código, se observan includedTags[] y Se agregaron parámetros transitiveDependenciesIncluded al create_workflow_invocation Objeto de DAG de Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

¿Qué sigue?