Programar execuções com o Cloud Composer

Neste documento, mostramos como executar execuções programadas de fluxos de trabalho SQL do Dataform usando o Cloud Composer 2.

É possível usar o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.

Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform.

O Dataform fornece vários operadores do Airflow. Isso inclui operadores para receber um resultado de compilação, receber uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de operadores do Airflow disponíveis no Dataform, consulte Operadores do Google Dataform.

Antes de começar

  1. Selecione ou crie um repositório do Dataform.
  2. Conceda ao Dataform acesso ao BigQuery.
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie pelo menos uma tabela.
  5. Crie um ambiente do Cloud Composer 2.
    1. Conceda os papéis roles/composer.worker e roles/dataform.editor à conta de serviço do ambiente do Cloud Composer.

Instalar o pacote PyPi google-cloud-dataform

Se você usar o Cloud Composer 2 na versão 2.0.25 e posterior, esse pacote será pré-instalado no ambiente. Não é necessário instalar.

Se você usar versões anteriores do Cloud Composer 2, instale o pacote PyPi google-cloud-dataform.

Na seção de pacotes PyPI, especifique a versão ==0.2.0.

criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform

Para gerenciar execuções programadas de fluxos de trabalho SQL do Dataform com o Cloud Composer 2, grave o DAG usando os operadores do Dataform Airflow e faça upload dele para o bucket do ambiente.

O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação do fluxo de trabalho do Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud no Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git no repositório Git remoto da versão do código que você quer usar

O exemplo de código a seguir mostra um DAG do Airflow que:

  1. Cria um resultado de compilação do Dataform.
  2. Inicia uma invocação assíncrona do fluxo de trabalho do Dataform.
  3. Pesquisa o status do seu fluxo de trabalho até que ele entre no estado esperado usando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud no Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git no repositório Git remoto da versão do código que você quer usar.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.

Adicionar parâmetros de configuração de compilação

É possível adicionar outros parâmetros de configuração de compilação ao objeto create_compilation_result do DAG do Airflow. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform CodeCompilationConfig.

  • Para adicionar parâmetros de configuração de compilação ao objeto create_compilation_result do DAG do Airflow, adicione os parâmetros selecionados a code_compilation_config no seguinte formato:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud no Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION pela região em que o repositório do Dataform está localizado.
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git no repositório Git remoto da versão do código que você quer usar.
  • PARAMETER: o parâmetro CodeCompilationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE: valor do parâmetro selecionado.

O exemplo de código a seguir mostra o parâmetro defaultDatabase adicionado ao objeto create_compilation_result do DAG do Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicionar parâmetros de configuração da invocação do fluxo de trabalho

É possível adicionar outros parâmetros de configuração da invocação do fluxo de trabalho ao objeto create_workflow_invocation do DAG do Airflow. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform InvocationConfig.

  • Para adicionar parâmetros de configuração da invocação do fluxo de trabalho ao objeto create_workflow_invocation do DAG do Airflow, adicione os parâmetros selecionados a invocation_config no seguinte formato:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud no Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o repositório do Dataform está localizado.
  • PARAMETER: o parâmetro InvocationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE: valor do parâmetro selecionado.

O exemplo de código a seguir mostra os parâmetros includedTags[] e transitiveDependenciesIncluded adicionados ao objeto create_workflow_invocation do DAG do Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

A seguir