Programar execuções com o Cloud Composer

Neste documento, mostramos como executar execuções programadas do Dataform Fluxos de trabalho SQL usando o Cloud Composer 2.

É possível usar o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.

Para gerenciar as programações de execução do Dataform com o Cloud Composer 2, siga estas etapas: É possível usar operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa Invocações de fluxo de trabalho do Dataform.

O Dataform fornece vários operadores do Airflow. Isso inclui operadores para obter o resultado de uma compilação, obter uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de opções do Dataform Operadores do Airflow, consulte Operadores do Google Dataform.

Antes de começar

  1. Selecione ou Crie um repositório do Dataform.
  2. Conceda acesso ao Dataform ao BigQuery
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie pelo menos uma tabela.
  5. Crie um ambiente do Cloud Composer 2.
    1. Conceda o papel roles/composer.worker. e roles/dataform.editor à conta de serviço do ambiente do Cloud Composer.

Instalar o pacote PyPi google-cloud-dataform

Se você usa o Cloud Composer 2 versões 2.0.25 e mais recentes, este pacote está pré-instalado no seu ambiente. Não é necessário instalá-lo.

Se você usa versões anteriores do Cloud Composer 2, Instale o pacote PyPi google-cloud-dataform.

Na seção de pacotes PyPI, especifique a versão ==0.2.0.

Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform

Gerenciar execuções programadas de fluxos de trabalho SQL do Dataform com Cloud Composer 2, programar o DAG usando operadores do Dataform Airflow, e fazer o upload para o bucket do ambiente.

O exemplo de código a seguir mostra um DAG do Airflow que cria um Dataform resultado da compilação e inicia uma invocação de fluxo de trabalho do Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud do Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o Dataform repositório está localizado
  • COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA Git, no repositório Git remoto da versão o código que você quer usar

O exemplo de código a seguir mostra um DAG do Airflow que:

  1. Cria um resultado de compilação do Dataform.
  2. Inicia uma invocação de fluxo de trabalho assíncrona do Dataform.
  3. Pesquisa o status do seu fluxo de trabalho até ele entrar no estado esperado usando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud do Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o Dataform repositório está localizado
  • COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git, no repositório Git remoto da versão do do código que você quer usar
  • COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho

Adicionar parâmetros de configuração de compilação

É possível adicionar outros parâmetros de configuração de compilação ao Objeto create_compilation_result do DAG do Airflow. Para mais informações sobre parâmetros disponíveis, consulte a Referência da API Dataform CodeCompilationConfig.

  • Para adicionar parâmetros de configuração de compilação ao create_compilation_result Objeto do DAG do Airflow, adicione os parâmetros selecionados a code_compilation_config no seguinte formato:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud do Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION pela região em que o repositório do Dataform está localizado
  • GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git, no repositório Git remoto da versão do do código que você quer usar
  • PARAMETER: parâmetro CodeCompilationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE: valor do parâmetro selecionado.

O exemplo de código a seguir mostra o parâmetro defaultDatabase adicionado à Objeto create_compilation_result do DAG do Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicionar parâmetros de configuração de invocação de fluxo de trabalho

É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao Objeto create_workflow_invocation do DAG do Airflow. Para mais informações sobre parâmetros disponíveis, consulte a Referência da API Dataform InvocationConfig.

  • Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao create_workflow_invocation objeto DAG do Airflow, adicione os parâmetros selecionados como invocation_config no seguinte formato:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Substitua:

  • PROJECT_ID: ID do projeto do Google Cloud do Dataform
  • REPOSITORY_ID: o nome do repositório do Dataform
  • REGION: a região em que o repositório do Dataform está localizado
  • PARAMETER: parâmetro InvocationConfig selecionado. É possível adicionar vários parâmetros.
  • PARAMETER_VALUE: valor do parâmetro selecionado.

O exemplo de código a seguir mostra os métodos includedTags[] e Parâmetros transitiveDependenciesIncluded adicionados ao create_workflow_invocation Objeto do DAG do Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

A seguir