É possível usar o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.
Para gerenciar as programações de execução do Dataform com o Cloud Composer 2, siga estas etapas: É possível usar operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa Invocações de fluxo de trabalho do Dataform.
O Dataform fornece vários operadores do Airflow. Isso inclui operadores para obter o resultado de uma compilação, obter uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de opções do Dataform Operadores do Airflow, consulte Operadores do Google Dataform.
Antes de começar
- Selecione ou Crie um repositório do Dataform.
- Conceda acesso ao Dataform ao BigQuery
- Selecione ou crie um espaço de trabalho do Dataform.
- Crie pelo menos uma tabela.
- Crie um ambiente do Cloud Composer 2.
- Conceda o papel roles/composer.worker. e roles/dataform.editor à conta de serviço do ambiente do Cloud Composer.
Instalar o pacote PyPi google-cloud-dataform
Se você usa o Cloud Composer 2 versões 2.0.25
e mais recentes, este pacote
está pré-instalado no seu ambiente. Não é necessário instalá-lo.
Se você usa versões anteriores do Cloud Composer 2,
Instale o pacote PyPi google-cloud-dataform
.
Na seção de pacotes PyPI, especifique a versão ==0.2.0
.
Criar um DAG do Airflow que programe invocações de fluxo de trabalho do Dataform
Gerenciar execuções programadas de fluxos de trabalho SQL do Dataform com Cloud Composer 2, programar o DAG usando operadores do Dataform Airflow, e fazer o upload para o bucket do ambiente.
O exemplo de código a seguir mostra um DAG do Airflow que cria um Dataform resultado da compilação e inicia uma invocação de fluxo de trabalho do Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: ID do projeto do Google Cloud do Dataform
- REPOSITORY_ID: o nome do repositório do Dataform
- REGION: a região em que o Dataform repositório está localizado
- COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho
- GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA Git, no repositório Git remoto da versão o código que você quer usar
O exemplo de código a seguir mostra um DAG do Airflow que:
- Cria um resultado de compilação do Dataform.
- Inicia uma invocação de fluxo de trabalho assíncrona do Dataform.
- Pesquisa o status do seu fluxo de trabalho até ele entrar no estado esperado
usando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: ID do projeto do Google Cloud do Dataform
- REPOSITORY_ID: o nome do repositório do Dataform
- REGION: a região em que o Dataform repositório está localizado
- COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho
- GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git, no repositório Git remoto da versão do do código que você quer usar
- COMPILATION_RESULT: o nome do resultado da compilação. que você quer usar nesta invocação de fluxo de trabalho
Adicionar parâmetros de configuração de compilação
É possível adicionar outros parâmetros de configuração de compilação ao
Objeto create_compilation_result
do DAG do Airflow. Para mais informações sobre
parâmetros disponíveis, consulte a Referência da API Dataform CodeCompilationConfig
.
- Para adicionar parâmetros de configuração de compilação ao
create_compilation_result
Objeto do DAG do Airflow, adicione os parâmetros selecionados acode_compilation_config
no seguinte formato:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Substitua:
- PROJECT_ID: ID do projeto do Google Cloud do Dataform
- REPOSITORY_ID: o nome do repositório do Dataform
- REGION pela região em que o repositório do Dataform está localizado
- GIT_COMMITISH: o commitish do Git, por exemplo, uma ramificação ou um SHA do Git, no repositório Git remoto da versão do do código que você quer usar
- PARAMETER: parâmetro
CodeCompilationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: valor do parâmetro selecionado.
O exemplo de código a seguir mostra o parâmetro defaultDatabase
adicionado à
Objeto create_compilation_result
do DAG do Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Adicionar parâmetros de configuração de invocação de fluxo de trabalho
É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao
Objeto create_workflow_invocation
do DAG do Airflow. Para mais informações sobre
parâmetros disponíveis, consulte a Referência da API Dataform InvocationConfig
.
- Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao
create_workflow_invocation
objeto DAG do Airflow, adicione os parâmetros selecionados comoinvocation_config
no seguinte formato:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Substitua:
- PROJECT_ID: ID do projeto do Google Cloud do Dataform
- REPOSITORY_ID: o nome do repositório do Dataform
- REGION: a região em que o repositório do Dataform está localizado
- PARAMETER: parâmetro
InvocationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: valor do parâmetro selecionado.
O exemplo de código a seguir mostra os métodos includedTags[]
e
Parâmetros transitiveDependenciesIncluded
adicionados ao
create_workflow_invocation
Objeto do DAG do Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
A seguir
- Para saber como configurar substituições de compilação para execuções de fluxo de trabalho, consulte Configurar substituições de compilação.
- Para saber mais sobre a API Dataform, consulte API Dataform.
- Para saber mais sobre os ambientes do Cloud Composer, consulte Visão geral do Cloud Composer.
- Para saber como programar execuções com os Workflows e no Cloud Scheduler, consulte Programar execuções com o Workflows e o Cloud Scheduler.