Programar execuções

Este documento mostra como fazer o seguinte no Dataform:

Antes de começar

Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, siga estas etapas:

  1. No Console do Google Cloud, acesse a página Dataform.

    Acessar o Dataform

  2. Selecione ou crie um repositório.

  3. Crie uma configuração de versão.

Para programar execuções com o Cloud Composer, faça o seguinte:

  1. Selecione ou crie um repositório do Dataform.
  2. Conceder acesso ao Dataform ao BigQuery.
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie pelo menos uma tabela.
  5. Crie um ambiente do Cloud Composer 2.

Funções exigidas

Para receber as permissões necessárias para concluir as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Para usar uma conta de serviço que não seja a conta de serviço padrão do Dataform, conceda acesso à conta de serviço personalizada.

Programar execuções com configurações de fluxo de trabalho

Esta seção mostra como criar uma configuração de fluxo de trabalho no Dataform para programar e configurar execuções de fluxo de trabalho. É possível usar as configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com uma programação.

Sobre as configurações de fluxo de trabalho

Para programar execuções do Dataform de todas as ações de fluxo de trabalho ou selecionadas no BigQuery, crie configurações de fluxo de trabalho. Em uma configuração de fluxo de trabalho, você seleciona uma configuração de lançamento de compilação, seleciona ações de fluxo de trabalho para execução e define a programação de execução.

Em seguida, durante uma execução programada da configuração do fluxo de trabalho, o Dataform implanta a seleção de ações do resultado de compilação mais recente na configuração de lançamento no BigQuery. Também é possível acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.

Uma configuração de fluxo de trabalho do Dataform contém as seguintes configurações de execução:

  • ID da configuração do fluxo de trabalho.
  • Configuração de lançamento.
  • Conta de serviço.

    Essa é a conta de serviço associada à configuração do fluxo de trabalho. Você pode selecionar a conta de serviço padrão do Dataform ou uma conta de serviço associada ao seu projeto do Google Cloud ou inserir manualmente uma conta de serviço diferente. Por padrão, as configurações de fluxo de trabalho usam as mesmas contas de serviço dos repositórios.

  • Ações do fluxo de trabalho a serem executadas:

    • Todas as ações.
    • Seleção de ações.
    • Seleção de tags.
  • Programação e fuso horário.

Criar uma configuração de fluxo de trabalho

Para criar uma configuração de fluxo de trabalho do Dataform, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na seção Configurações do fluxo de trabalho, clique em Criar.
  3. No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, insira um ID exclusivo para a configuração do fluxo de trabalho.

    Os IDs só podem incluir números, letras, hifens e sublinhados.

  4. No menu Configuração da versão, selecione uma configuração da versão de compilação.

  5. Opcional: no campo Frequência, insira a frequência das execuções no formato unix-cron.

    Para garantir que o Dataform execute o resultado de compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre o horário de criação do resultado de compilação e o horário da execução programada.

  6. No menu Conta de serviço, selecione uma conta de serviço para a configuração do fluxo de trabalho.

    No menu, você pode selecionar a conta de serviço padrão do Dataform ou qualquer conta de serviço associada ao seu projeto do Google Cloud a que você tem acesso. Se você não selecionar uma conta de serviço, a configuração do fluxo de trabalho usará a conta de serviço do repositório.

  7. Opcional: no menu Fuso horário, selecione o fuso horário para as execuções.

    O fuso horário padrão é UTC.

  8. Selecione as ações do fluxo de trabalho a serem executadas:

    • Para executar todo o fluxo de trabalho, clique em Todas as ações.
    • Para executar as ações selecionadas no fluxo de trabalho, clique em Seleção de ações e selecione as ações.
    • Para executar ações com tags selecionadas, clique em Seleção de tags e escolha as tags.
    • Opcional: para executar ações ou tags selecionadas e as dependências delas, selecione a opção Incluir dependências.
    • Opcional: para executar as ações ou tags selecionadas e as dependências delas, selecione a opção Incluir dependências.
    • Opcional: para recriar todas as tabelas do zero, selecione a opção Run with full refresh.

    Sem essa opção, o Dataform atualiza tabelas incrementais sem recriar do zero.

  9. Clique em Criar.

Por exemplo, a configuração de fluxo de trabalho a seguir executa ações com a tag hourly a cada hora no fuso horário CEST:

  • ID da configuração: production-hourly
  • Configuração da versão: -
  • Frequência: 0 * * * *
  • Fuso horário: Central European Summer Time (CEST)
  • Seleção de ações do fluxo de trabalho: seleção de tags, tag hourly

Para editar uma configuração de fluxo de trabalho, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na configuração do fluxo de trabalho que você quer editar, clique no menu Mais e em Editar.
  3. No painel Editar configuração do fluxo de trabalho, edite as configurações da configuração da versão e clique em Salvar.

Para excluir uma configuração de fluxo de trabalho, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na configuração do fluxo de trabalho que você quer excluir, clique no menu Mais e em Excluir.
  3. Na caixa de diálogo Excluir configuração da versão, clique em Excluir.

Esta seção mostra como programar execuções de fluxos de trabalho do Dataform usando o Cloud Scheduler.

É possível definir a frequência das execuções do fluxo de trabalho do Dataform criando um job do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. O Workflows executa serviços em um fluxo de trabalho de orquestração definido por você.

O Workflows executa o fluxo de trabalho do Dataform em um processo de duas etapas. Primeiro, ele extrai o código do repositório do Dataform do provedor do Git e o compila em um resultado de compilação. Em seguida, ele usa o resultado da compilação para criar um fluxo de trabalho do Dataform e o executa na frequência definida.

Criar um fluxo de trabalho de orquestração programado

Para programar execuções do fluxo de trabalho do Dataform, use o Workflows para criar um fluxo de trabalho de orquestração e adicionar um job do Cloud Scheduler como acionador.

  1. O Workflows usa contas de serviço para dar acesso a recursos doGoogle Cloud . Crie uma conta de serviço e conceda a ela o papel de Gerenciamento de identidade e acesso do editor do Dataform (roles/dataform.editor), bem como as permissões mínimas necessárias para gerenciar o fluxo de trabalho de orquestração. Para mais informações, consulte Conceder permissão a um fluxo de trabalho para acessar recursos Google Cloud .

  2. Crie um fluxo de trabalho de orquestração e use o seguinte código-fonte YAML como definição do fluxo de trabalho:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud .
    • REPOSITORY_LOCATION: o local do repositório do Dataform.
    • REPOSITORY_ID: o nome do repositório do Dataform.
    • GIT_COMMITISH: a ramificação do Git em que você quer executar o código do Dataform. Para um repositório recém-criado, substitua por main.
  3. Programe o fluxo de trabalho de orquestração usando o Cloud Scheduler.

Personalizar a solicitação de resultado de compilação de criação do fluxo de trabalho do Dataform

É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de resultado de compilação de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações, consulte a referência de recursos REST projects.locations.repositories.compilationResults.

Por exemplo, para adicionar uma configuração schemaSuffix _dev a todas as ações durante a compilação, substitua o corpo da etapa createCompilationResult pelo seguinte snippet de código:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução de fluxos de trabalho e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.

Personalizar a solicitação de invocação do fluxo de trabalho do Dataform

É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de invocação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações de solicitação de invocação, consulte a referência de recurso REST projects.locations.repositories.workflowInvocations.

Por exemplo, para executar apenas ações com a tag hourly com todas as dependências transitivas incluídas, substitua o corpo createWorkflowInvocation pelo snippet de código abaixo:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução de fluxos de trabalho e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.

Programar execuções com o Cloud Composer

Use o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.

Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em grafos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform.

O Dataform oferece vários operadores do Airflow. Eles incluem operadores para receber um resultado de compilação, receber uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de operadores disponíveis do Dataform Airflow, consulte Operadores do Google Dataform.

Instale o pacote PyPi google-cloud-dataform

Se você usa o Cloud Composer 2 versões 2.0.25 e mais recentes, esse pacote está pré-instalado no seu ambiente. Não é necessário instalar.

Se você usa versões anteriores do Cloud Composer 2, instale o pacote PyPi google-cloud-dataform.

Na seção de pacotes PyPI, especifique a versão ==0.2.0.

Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform

Para gerenciar execuções programadas de fluxos de trabalho do Dataform com o Cloud Composer 2, programe o DAG usando os operadores do Dataform Airflow e faça upload dele no bucket do seu ambiente.

O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação de fluxo de trabalho do Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
  • REPOSITORY_ID: o nome do repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.

O exemplo de código abaixo mostra um DAG do Airflow que realiza as seguintes ações:

  1. Cria um resultado de compilação do Dataform.
  2. Inicia uma invocação assíncrona do fluxo de trabalho do Dataform.
  3. Pesquisa o status do fluxo de trabalho até que ele entre no estado esperado usando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
  • REPOSITORY_ID: o nome do repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.

Adicionar parâmetros de configuração de compilação

É possível adicionar outros parâmetros de configuração de compilação ao objeto DAG create_compilation_result do Airflow. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API CodeCompilationConfig Dataform.

  • Para adicionar parâmetros de configuração de compilação ao objeto DAG create_compilation_result do Airflow, adicione os parâmetros selecionados ao campo code_compilation_config neste formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Substitua:

    • PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
    • REPOSITORY_ID: o nome do repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
    • PARAMETER: o parâmetro CodeCompilationConfig selecionado. É possível adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O exemplo de código a seguir mostra o parâmetro defaultDatabase adicionado ao objeto DAG create_compilation_result do Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicionar parâmetros de configuração de invocação de fluxo de trabalho

É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API InvocationConfig Dataform.

  • Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation, adicione os parâmetros selecionados ao campo invocation_config no seguinte formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Substitua:

    • PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
    • REPOSITORY_ID: o nome do repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • PARAMETER: o parâmetro InvocationConfig selecionado. É possível adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O exemplo de código a seguir mostra os parâmetros includedTags[] e transitiveDependenciesIncluded adicionados ao objeto DAG create_workflow_invocation do Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

A seguir