Programar execuções com o Cloud Composer

Neste documento, mostramos como executar execuções programadas de fluxos de trabalho SQL do Dataform usando o Cloud Composer 2.

Sobre a execução programada do fluxo de trabalho com o Cloud Composer 2

Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em DAGs do Airflow. É possível criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform.

O Dataform fornece vários operadores do Airflow. Por exemplo, operadores para receber um resultado de compilação, invocar ou cancelar uma invocação de fluxo de trabalho. Para ver a lista completa de operadores do Dataform Airflow disponíveis, consulte Operadores do Dataform Google.

Antes de começar

  1. Selecione ou crie um repositório do Dataform.
  2. Conceder ao Dataform acesso ao BigQuery.
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie pelo menos uma tabela.
  5. Crie um ambiente do Cloud Composer 2.
    1. Conceda os papéis roles/composer.worker e roles/dataform.editor à conta de serviço do ambiente do Cloud Composer.

Instale o pacote PyPi google-cloud-dataform

Se você usar o Cloud Composer 2 nas versões 2.0.25 e posteriores, esse pacote estará pré-instalado no seu ambiente. Não é necessário instalá-lo.

Se você usa versões anteriores do Cloud Composer 2, instale o pacote PyPi google-cloud-dataform.

Na seção de pacotes PyPI, especifique a versão ==0.2.0.

Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform

Para gerenciar execuções programadas de fluxos de trabalho do SQL do Dataform com o Cloud Composer 2, escreva o DAG usando operadores do Dataform Airflow e faça upload dele para o bucket do ambiente.

O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação do fluxo de trabalho do Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
  • REPOSITORY_ID pelo nome do repositório do Dataform.
  • REGION pela região em que o repositório do Dataform está localizado;
  • COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.

O exemplo de código a seguir mostra um DAG do Airflow que:

  1. Cria um resultado de compilação do Dataform.
  2. Inicia uma invocação do fluxo de trabalho assíncrono do Dataform.
  3. Use DataformWorkflowInvocationStateSensor para pesquisar o status do fluxo de trabalho até que ele entre no estado desejado.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
  • REPOSITORY_ID pelo nome do repositório do Dataform.
  • REGION pela região em que o repositório do Dataform está localizado;
  • COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.
  • COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.

Adicionar parâmetros de configuração de compilação

É possível adicionar outros parâmetros de configuração de compilação ao objeto DAG do Airflow create_compilation_result. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform CodeCompilationConfig.

  • Para adicionar parâmetros de configuração de compilação ao objeto DAG do create_compilation_result, adicione os parâmetros selecionados a code_compilation_config no seguinte formato:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Substitua:

  • PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
  • REPOSITORY_ID pelo nome do repositório do Dataform.
  • REGION pela região em que o repositório do Dataform está localizado.
  • GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.
  • PARAMETER por um parâmetro CodeCompilationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE por um valor do parâmetro selecionado.

O exemplo de código a seguir mostra o parâmetro defaultDatabase adicionado ao objeto DAG do Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicionar parâmetros de configuração de invocação do fluxo de trabalho

É possível adicionar outros parâmetros de configuração de invocação do fluxo de trabalho ao objeto create_workflow_invocation do DAG do Airflow. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform InvocationConfig.

  • Para adicionar parâmetros de configuração de invocação do fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation, adicione os parâmetros selecionados a invocation_config no seguinte formato:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Substitua:

  • PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
  • REPOSITORY_ID pelo nome do repositório do Dataform.
  • REGION pela região em que o repositório do Dataform está localizado.
  • PARAMETER por um parâmetro InvocationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE por um valor do parâmetro selecionado.

O exemplo de código a seguir mostra os parâmetros includedTags[] e transitiveDependenciesIncluded adicionados ao objeto DAG do Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

A seguir