Este documento mostra como fazer o seguinte no Dataform:
- Programe execuções com configurações de fluxo de trabalho.
- Programe execuções com o Workflows e o Cloud Scheduler.
- Programe execuções com o Cloud Composer.
Antes de começar
Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, siga estas etapas:
No Console do Google Cloud, acesse a página Dataform.
Selecione ou crie um repositório.
Crie uma configuração de versão.
Para programar execuções com o Cloud Composer, faça o seguinte:
- Selecione ou crie um repositório do Dataform.
- Conceder acesso ao Dataform ao BigQuery.
- Selecione ou crie um espaço de trabalho do Dataform.
- Crie pelo menos uma tabela.
- Crie um ambiente do Cloud Composer 2.
Funções exigidas
Para receber as permissões necessárias para concluir as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Administrador do Dataform (
roles/dataform.admin
) nos repositórios -
Editor do Dataform (
roles/dataform.editor
) em repositórios e na conta de serviço do ambiente do Cloud Composer -
Worker do Composer (
roles/composer.worker
) na conta de serviço do ambiente do Cloud Composer
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.
Para usar uma conta de serviço que não seja a conta de serviço padrão do Dataform, conceda acesso à conta de serviço personalizada.
Programar execuções com configurações de fluxo de trabalho
Esta seção mostra como criar uma configuração de fluxo de trabalho no Dataform para programar e configurar execuções de fluxo de trabalho. É possível usar as configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com uma programação.
Sobre as configurações de fluxo de trabalho
Para programar execuções do Dataform de todas as ações de fluxo de trabalho ou selecionadas no BigQuery, crie configurações de fluxo de trabalho. Em uma configuração de fluxo de trabalho, você seleciona uma configuração de lançamento de compilação, seleciona ações de fluxo de trabalho para execução e define a programação de execução.
Em seguida, durante uma execução programada da configuração do fluxo de trabalho, o Dataform implanta a seleção de ações do resultado de compilação mais recente na configuração de lançamento no BigQuery. Também é possível acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.
Uma configuração de fluxo de trabalho do Dataform contém as seguintes configurações de execução:
- ID da configuração do fluxo de trabalho.
- Configuração de lançamento.
Conta de serviço.
Essa é a conta de serviço associada à configuração do fluxo de trabalho. Você pode selecionar a conta de serviço padrão do Dataform ou uma conta de serviço associada ao seu projeto do Google Cloud ou inserir manualmente uma conta de serviço diferente. Por padrão, as configurações de fluxo de trabalho usam as mesmas contas de serviço dos repositórios.
Ações do fluxo de trabalho a serem executadas:
- Todas as ações.
- Seleção de ações.
- Seleção de tags.
Programação e fuso horário.
Criar uma configuração de fluxo de trabalho
Para criar uma configuração de fluxo de trabalho do Dataform, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na seção Configurações do fluxo de trabalho, clique em Criar.
No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, insira um ID exclusivo para a configuração do fluxo de trabalho.
Os IDs só podem incluir números, letras, hifens e sublinhados.
No menu Configuração da versão, selecione uma configuração da versão de compilação.
Opcional: no campo Frequência, insira a frequência das execuções no formato unix-cron.
Para garantir que o Dataform execute o resultado de compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre o horário de criação do resultado de compilação e o horário da execução programada.
No menu Conta de serviço, selecione uma conta de serviço para a configuração do fluxo de trabalho.
No menu, você pode selecionar a conta de serviço padrão do Dataform ou qualquer conta de serviço associada ao seu projeto do Google Cloud a que você tem acesso. Se você não selecionar uma conta de serviço, a configuração do fluxo de trabalho usará a conta de serviço do repositório.
Opcional: no menu Fuso horário, selecione o fuso horário para as execuções.
O fuso horário padrão é UTC.
Selecione as ações do fluxo de trabalho a serem executadas:
- Para executar todo o fluxo de trabalho, clique em Todas as ações.
- Para executar as ações selecionadas no fluxo de trabalho, clique em Seleção de ações e selecione as ações.
- Para executar ações com tags selecionadas, clique em Seleção de tags e escolha as tags.
- Opcional: para executar ações ou tags selecionadas e as dependências delas, selecione a opção Incluir dependências.
- Opcional: para executar as ações ou tags selecionadas e as dependências delas, selecione a opção Incluir dependências.
- Opcional: para recriar todas as tabelas do zero, selecione a opção Run with full refresh.
Sem essa opção, o Dataform atualiza tabelas incrementais sem recriar do zero.
Clique em Criar.
Por exemplo, a configuração de fluxo de trabalho a seguir executa ações com a tag hourly
a cada hora no fuso horário CEST:
- ID da configuração:
production-hourly
- Configuração da versão: -
- Frequência:
0 * * * *
- Fuso horário:
Central European Summer Time (CEST)
- Seleção de ações do fluxo de trabalho: seleção de tags, tag
hourly
Editar uma configuração de fluxo de trabalho
Para editar uma configuração de fluxo de trabalho, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na configuração do fluxo de trabalho que você quer editar, clique no menu Mais e em Editar.
- No painel Editar configuração do fluxo de trabalho, edite as configurações da configuração da versão e clique em Salvar.
Excluir uma configuração de fluxo de trabalho
Para excluir uma configuração de fluxo de trabalho, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na configuração do fluxo de trabalho que você quer excluir, clique no menu Mais e em Excluir.
- Na caixa de diálogo Excluir configuração da versão, clique em Excluir.
Programar execuções com o Cloud Scheduler e o Workflows
Esta seção mostra como programar execuções de fluxos de trabalho do Dataform usando o Cloud Scheduler.
Sobre as execuções de fluxo de trabalho programadas
É possível definir a frequência das execuções do fluxo de trabalho do Dataform criando um job do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. O Workflows executa serviços em um fluxo de trabalho de orquestração definido por você.
O Workflows executa o fluxo de trabalho do Dataform em um processo de duas etapas. Primeiro, ele extrai o código do repositório do Dataform do provedor do Git e o compila em um resultado de compilação. Em seguida, ele usa o resultado da compilação para criar um fluxo de trabalho do Dataform e o executa na frequência definida.
Criar um fluxo de trabalho de orquestração programado
Para programar execuções do fluxo de trabalho do Dataform, use o Workflows para criar um fluxo de trabalho de orquestração e adicionar um job do Cloud Scheduler como acionador.
O Workflows usa contas de serviço para dar acesso a recursos doGoogle Cloud . Crie uma conta de serviço e conceda a ela o papel de Gerenciamento de identidade e acesso do editor do Dataform (
roles/dataform.editor
), bem como as permissões mínimas necessárias para gerenciar o fluxo de trabalho de orquestração. Para mais informações, consulte Conceder permissão a um fluxo de trabalho para acessar recursos Google Cloud .Crie um fluxo de trabalho de orquestração e use o seguinte código-fonte YAML como definição do fluxo de trabalho:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Substitua:
- PROJECT_ID: o ID do projeto Google Cloud .
- REPOSITORY_LOCATION: o local do repositório do Dataform.
- REPOSITORY_ID: o nome do repositório do Dataform.
- GIT_COMMITISH: a ramificação do Git em que você quer executar o código do Dataform. Para um repositório recém-criado, substitua por
main
.
Programe o fluxo de trabalho de orquestração usando o Cloud Scheduler.
Personalizar a solicitação de resultado de compilação de criação do fluxo de trabalho do Dataform
É possível
atualizar o fluxo de trabalho de orquestração atual
e definir as configurações de solicitação de resultado de compilação
de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações,
consulte a
referência de recursos REST projects.locations.repositories.compilationResults
.
Por exemplo, para adicionar uma configuração schemaSuffix
_dev
a todas as ações durante a compilação,
substitua o corpo da etapa createCompilationResult
pelo seguinte snippet de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução de fluxos de trabalho e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.
Personalizar a solicitação de invocação do fluxo de trabalho do Dataform
É possível
atualizar o fluxo de trabalho de orquestração atual
e definir as configurações de solicitação de invocação do fluxo de trabalho do Dataform no
formato YAML. Para mais informações sobre as configurações de solicitação de invocação,
consulte a
referência de recurso REST projects.locations.repositories.workflowInvocations
.
Por exemplo, para executar apenas ações com a tag hourly
com todas as
dependências transitivas incluídas, substitua o corpo createWorkflowInvocation
pelo snippet de código abaixo:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução de fluxos de trabalho e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.
Programar execuções com o Cloud Composer
Use o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.
Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em grafos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform.
O Dataform oferece vários operadores do Airflow. Eles incluem operadores para receber um resultado de compilação, receber uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de operadores disponíveis do Dataform Airflow, consulte Operadores do Google Dataform.
Instale o pacote PyPi google-cloud-dataform
Se você usa o Cloud Composer 2 versões 2.0.25
e mais recentes, esse pacote
está pré-instalado no seu ambiente. Não é necessário instalar.
Se você usa versões anteriores do Cloud Composer 2,
instale o pacote PyPi google-cloud-dataform
.
Na seção de pacotes PyPI, especifique a versão ==0.2.0
.
Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform
Para gerenciar execuções programadas de fluxos de trabalho do Dataform com o Cloud Composer 2, programe o DAG usando os operadores do Dataform Airflow e faça upload dele no bucket do seu ambiente.
O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação de fluxo de trabalho do Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
- REPOSITORY_ID: o nome do repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
O exemplo de código abaixo mostra um DAG do Airflow que realiza as seguintes ações:
- Cria um resultado de compilação do Dataform.
- Inicia uma invocação assíncrona do fluxo de trabalho do Dataform.
- Pesquisa o status do fluxo de trabalho até que ele entre no estado esperado
usando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
- REPOSITORY_ID: o nome do repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
Adicionar parâmetros de configuração de compilação
É possível adicionar outros parâmetros de configuração de compilação ao
objeto DAG create_compilation_result
do Airflow. Para mais informações sobre
os parâmetros disponíveis, consulte a
referência da API CodeCompilationConfig
Dataform.
Para adicionar parâmetros de configuração de compilação ao objeto DAG
create_compilation_result
do Airflow, adicione os parâmetros selecionados ao campocode_compilation_config
neste formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Substitua:
- PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
- REPOSITORY_ID: o nome do repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- GIT_COMMITISH: o commitish do Git no repositório remoto do Git da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
- PARAMETER: o parâmetro
CodeCompilationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O exemplo de código a seguir mostra o parâmetro defaultDatabase
adicionado ao
objeto DAG create_compilation_result
do Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Adicionar parâmetros de configuração de invocação de fluxo de trabalho
É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao
objeto DAG do Airflow create_workflow_invocation
. Para mais informações sobre
os parâmetros disponíveis, consulte a
referência da API InvocationConfig
Dataform.
Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow
create_workflow_invocation
, adicione os parâmetros selecionados ao campoinvocation_config
no seguinte formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Substitua:
- PROJECT_ID: é o ID do projeto do Dataform no Google Cloud.
- REPOSITORY_ID: o nome do repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- PARAMETER: o parâmetro
InvocationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O exemplo de código a seguir mostra os parâmetros includedTags[]
e
transitiveDependenciesIncluded
adicionados ao
objeto DAG create_workflow_invocation
do Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
A seguir
- Para saber como configurar as configurações de lançamento de compilação do Dataform, consulte Criar uma configuração de lançamento.
- Para saber mais sobre o ciclo de vida do código do Dataform, consulte Introdução ao ciclo de vida do código no Dataform.
- Para saber mais sobre a API Dataform, consulte API Dataform.
- Para saber mais sobre os ambientes do Cloud Composer, consulte Visão geral do Cloud Composer.
- Para saber mais sobre os preços do Workflows, consulte Preços do Workflows.