Sobre a execução programada do fluxo de trabalho com o Cloud Composer 2
Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em DAGs do Airflow. É possível criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform.
O Dataform fornece vários operadores do Airflow. Por exemplo, operadores para receber um resultado de compilação, invocar ou cancelar uma invocação de fluxo de trabalho. Para ver a lista completa de operadores do Dataform Airflow disponíveis, consulte Operadores do Dataform Google.
Antes de começar
- Selecione ou crie um repositório do Dataform.
- Conceder ao Dataform acesso ao BigQuery.
- Selecione ou crie um espaço de trabalho do Dataform.
- Crie pelo menos uma tabela.
- Crie um ambiente do Cloud Composer 2.
- Conceda os papéis roles/composer.worker e roles/dataform.editor à conta de serviço do ambiente do Cloud Composer.
Instale o pacote PyPi google-cloud-dataform
Se você usar o Cloud Composer 2 nas versões 2.0.25
e posteriores, esse pacote estará pré-instalado no seu ambiente. Não é necessário instalá-lo.
Se você usa versões anteriores do Cloud Composer 2, instale o pacote PyPi google-cloud-dataform
.
Na seção de pacotes PyPI, especifique a versão ==0.2.0
.
Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform
Para gerenciar execuções programadas de fluxos de trabalho do SQL do Dataform com o Cloud Composer 2, escreva o DAG usando operadores do Dataform Airflow e faça upload dele para o bucket do ambiente.
O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação do fluxo de trabalho do Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
- REPOSITORY_ID pelo nome do repositório do Dataform.
- REGION pela região em que o repositório do Dataform está localizado;
- COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
- GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.
O exemplo de código a seguir mostra um DAG do Airflow que:
- Cria um resultado de compilação do Dataform.
- Inicia uma invocação do fluxo de trabalho assíncrono do Dataform.
- Use
DataformWorkflowInvocationStateSensor
para pesquisar o status do fluxo de trabalho até que ele entre no estado desejado.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
- REPOSITORY_ID pelo nome do repositório do Dataform.
- REGION pela região em que o repositório do Dataform está localizado;
- COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
- GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.
- COMPILATION_RESULT pelo nome do resultado da compilação que você quer usar para esta invocação do fluxo de trabalho.
Adicionar parâmetros de configuração de compilação
É possível adicionar outros parâmetros de configuração de compilação ao objeto DAG do Airflow create_compilation_result
. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform CodeCompilationConfig
.
- Para adicionar parâmetros de configuração de compilação ao objeto DAG do
create_compilation_result
, adicione os parâmetros selecionados acode_compilation_config
no seguinte formato:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Substitua:
- PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
- REPOSITORY_ID pelo nome do repositório do Dataform.
- REGION pela região em que o repositório do Dataform está localizado.
- GIT_COMMITISH pelo commit do Git, por exemplo, um branch ou um Git SHA, no repositório Git remoto da versão do código que você quer usar.
- PARAMETER por um parâmetro
CodeCompilationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE por um valor do parâmetro selecionado.
O exemplo de código a seguir mostra o parâmetro defaultDatabase
adicionado ao objeto DAG do Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Adicionar parâmetros de configuração de invocação do fluxo de trabalho
É possível adicionar outros parâmetros de configuração de invocação do fluxo de trabalho ao objeto create_workflow_invocation
do DAG do Airflow. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform InvocationConfig
.
- Para adicionar parâmetros de configuração de invocação do fluxo de trabalho ao objeto DAG do Airflow
create_workflow_invocation
, adicione os parâmetros selecionados ainvocation_config
no seguinte formato:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Substitua:
- PROJECT_ID pelo ID do projeto do Google Cloud no Dataform.
- REPOSITORY_ID pelo nome do repositório do Dataform.
- REGION pela região em que o repositório do Dataform está localizado.
- PARAMETER por um parâmetro
InvocationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE por um valor do parâmetro selecionado.
O exemplo de código a seguir mostra os parâmetros includedTags[]
e transitiveDependenciesIncluded
adicionados ao objeto DAG do Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
A seguir
- Para saber como configurar substituições de compilação para execuções de fluxo de trabalho, consulte Configurar substituições de compilação.
- Para saber mais sobre a API Dataform, consulte API Dataform.
- Para saber mais sobre os ambientes do Cloud Composer, consulte Visão geral do Cloud Composer.
- Para saber como programar execuções com o Workflows e o Cloud Scheduler, consulte Programar execuções com Workflows e Cloud Scheduler.