Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Neste guia, mostramos como criar um gráfico acíclico dirigido do Apache Airflow (DAG) executado em um ambiente do Cloud Composer.
Como o Apache Airflow não fornece um forte isolamento de DAG e tarefas, use ambientes de produção e teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
Como estruturar um DAG do Airflow
Um DAG do Airflow é definido em um arquivo Python e é composto pelo seguinte: componentes:
- Definição de DAG
- Operadores do Airflow
- Relacionamentos entre operadores
Os snippets de código a seguir mostram exemplos de cada componente fora do contexto.
Uma definição de DAG
O exemplo a seguir demonstra um DAG do Airflow definição:
Airflow 2
Airflow 1
Operadores e tarefas
Os operadores do Airflow descrevem o trabalho a ser feito. Uma tarefa task é uma instância específica de um operador.
Airflow 2
Airflow 1
Relações de tarefas
Relacionamentos de tarefas descrevem a ordem em quais o trabalho deve ser concluído.
Airflow 2
Airflow 1
Exemplo completo de fluxo de trabalho do DAG em Python
O fluxo de trabalho a seguir é um modelo de DAG funcional completo, composto de
duas tarefas: uma hello_python
e uma goodbye_bash
:
Airflow 2
Airflow 1
Para mais informações sobre como definir DAGs do Airflow, consulte a Tutorial do Airflow e Conceitos do Airflow.
Operadores do Airflow
Nos exemplos a seguir, você vê alguns operadores conhecidos do Airflow. Para um autoritativa de operadores do Airflow, consulte Referência de operadores e hooks e o Índice de provedores.
BashOperator
Use o BashOperator para executar programas de linha de comando.
Airflow 2
Airflow 1
O Cloud Composer executa os comandos fornecidos em um script Bash em uma Worker do Airflow. O worker é um contêiner do Docker baseado no Debian e inclui vários pacotes.
- Comando
gcloud
, incluindo o subcomandogcloud storage
para trabalhar com buckets do Cloud Storage. - comando
bq
- comando
kubectl
PythonOperator
Use o PythonOperator para executar código arbitrário do Python.
O Cloud Composer executa o código Python em um contêiner que inclui pacotes da versão da imagem do Cloud Composer usada no seu ambiente.
Para instalar mais pacotes Python, consulte Como instalar dependências do Python.
Operadores do Google Cloud
Para executar tarefas que usam produtos do Google Cloud, use os operadores do Google Cloud Airflow. Por exemplo, os operadores do BigQuery consultam e processam dados no BigQuery.
Há muitos outros operadores do Airflow para o Google Cloud e serviços individuais fornecidos por ele. Consulte a lista completa em Operadores do Google Cloud.
Airflow 2
Airflow 1
EmailOperator
Use o EmailOperator para enviar e-mails de um DAG. Para enviar e-mails de um ambiente do Cloud Composer, configure o ambiente para usar o SendGrid.
Airflow 2
Airflow 1
Notificações sobre falha do operador
Defina email_on_failure
como True
para enviar uma notificação por e-mail quando um operador no DAG falhar. Para enviar notificações por e-mail de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.
Airflow 2
Airflow 1
Diretrizes do fluxo de trabalho de DAG
Coloque as bibliotecas Python personalizadas em um arquivo ZIP do DAG em um diretório aninhado. Não inclua bibliotecas no nível superior do diretório de DAGs.
Quando o Airflow verifica a pasta
dags/
, ele só verifica os DAGs em Módulos do Python que estão no nível superior da pasta de DAGs e no topo de um arquivo ZIP também localizado na pastadags/
de nível superior. Se O Airflow encontra um módulo Python em um arquivo ZIP que não contém as substringsairflow
eDAG
, o Airflow deixará de processar o ZIP arquivado. O Airflow retorna apenas os DAGs encontrados até esse ponto.Use o Airflow 2 em vez do Airflow 1.
A comunidade do Airflow não publica novas versões secundárias ou de patch para o Airflow 1.
Para ter tolerância a falhas, não defina vários objetos de DAG no mesmo módulo do Python.
Não use subDAGs. Em vez disso, agrupe tarefas dentro de DAGs.
Coloque os arquivos necessários no tempo de análise do DAG na pasta
dags/
, e não na pastadata/
.Teste os DAGs desenvolvidos ou modificados conforme recomendado em instruções para testar DAGs.
Verificar se os DAGs desenvolvidos não aumentam muito tempos de análise do DAG.
As tarefas do Airflow podem falhar por vários motivos. Para evitar falhas de execuções de DAG inteiras, recomendamos ativar as novas tentativas de tarefas. Definir o máximo de tentativas como
0
significa que nenhuma nova tentativa será realizada.Recomendamos substituir o
default_task_retries
com um valor para o novas tentativas de tarefa diferentes de0
. Além disso, você pode definir o parâmetroretries
no nível da tarefa.Se você quiser usar a GPU nas tarefas do Airflow, crie um cluster do GKE separado com base em nós que usam máquinas com GPUs. Usar GKEStartPodOperator para executar as tarefas.
Evite executar tarefas com uso intenso de CPU e memória no pool de nós do cluster em que outros componentes do Airflow (programadores, workers, servidores da Web) estão em execução. Em vez disso, use KubernetesPodOperator ou GKEStartPodOperator.
Ao implantar DAGs em um ambiente, faça upload apenas dos arquivos que são absolutamente necessários para interpretar e executar DAGs na pasta
/dags
.Limite o número de arquivos DAG na pasta
/dags
.O Airflow está analisando continuamente os DAGs na pasta
/dags
. A análise é uma que passa pela pasta de DAGs e pelo número de arquivos precisam ser carregados (com suas dependências) afeta o desempenho da análise de DAGs e da programação de tarefas. É muito mais eficiente usar 100 arquivos com 100 DAGs cada do que 10.000 arquivos com 1 DAG cada. Portanto, essa otimização é recomendada. Essa otimização é um equilíbrio entre o tempo de análise e a eficiência da criação e do gerenciamento de DAGs.Você também pode considerar, por exemplo, a implantação de 10.000 arquivos DAG criar 100 arquivos ZIP, cada um contendo 100 arquivos DAG.
Além das dicas acima, se você tem mais de 10.000 arquivos DAG, gerar DAGs de maneira programática pode ser uma boa opção. Por exemplo: é possível implementar um único arquivo DAG em Python que gere algumas Objetos do DAG (por exemplo, 20.100 objetos do DAG).
Evite usar operadores do Airflow descontinuados
Os operadores listados na tabela a seguir foram descontinuados. Alguns desses operadores eram compatíveis com versões anteriores do Cloud Composer 1. Evite usá-los em seus DAGs. Em vez disso, use alternativas atualizadas fornecidas.
Operador descontinuado | Operador a ser usado |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator e o MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions e o MLEngineDeleteVersion. |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
Perguntas frequentes sobre como escrever DAGs
Como diminuir a repetição de código para executar tarefas iguais ou semelhantes em vários DAGs?
Recomendamos definir bibliotecas e wrappers para minimizar a repetição de código.
Como faço para reutilizar código entre arquivos de DAGs?
Coloque suas funções utilitárias em um
Biblioteca local do Python
e importar as funções. É possível referenciar as funções em qualquer DAG localizado na pasta dags/
no bucket do ambiente.
Como diminuo o risco de surgirem diferentes definições?
Por exemplo, tenho duas equipes que querem agregar dados brutos em métricas de receita. Elas escrevem duas tarefas ligeiramente diferentes que fazem a mesma coisa. Defina bibliotecas para trabalhar com os dados de receita. Assim, os implementadores de DAGs precisam esclarecer a definição da receita que está sendo agregada.
Como faço para definir dependências entre DAGs?
Isso depende de como você quer definir a dependência.
Se você tiver dois DAGs (DAG A e DAG B) e quiser que o DAG B seja acionado após o DAG
A, é possível colocar
TriggerDagRunOperator
no final do DAG A.
Se o DAG B depender somente de um artefato gerado pelo DAG A, como uma mensagem do Pub/Sub, um sensor funcionará melhor.
Se o DAG B estiver muito integrado ao DAG A, talvez seja possível mesclar os dois em um único DAG.
Como transmito códigos exclusivos de execução para um DAG e às tarefas dele?
Por exemplo, quero transmitir caminhos de arquivo e nomes de cluster do Dataproc.
É possível retornar str(uuid.uuid4())
em PythonOperator
para gerar um ID exclusivo aleatório. Isso coloca o ID
XComs
para que você possa se referir ao ID em outros operadores
por meio de campos com modelo.
Antes de gerar um uuid
, considere se um ID específico do DagRun seria mais importante. Também é possível referenciar esses IDs nas substituições por meio de macros.
Como separar tarefas em um DAG?
Cada tarefa precisa ser uma unidade de trabalho idempotente. Consequentemente, evite encapsular um fluxo de trabalho de várias etapas em uma única tarefa, como um programa complexo em execução em um PythonOperator
.
É recomendado definir várias tarefas em um único DAG para agregar dados de várias origens?
Por exemplo, tenho várias tabelas com dados brutos e quero criar agregações diárias para cada uma delas. As tarefas não são dependentes entre si. Preciso criar uma tarefa e um DAG para cada tabela ou gerar um DAG geral?
Se estiver tudo bem para você que cada tarefa tenha as mesmas propriedades no nível do DAG, como schedule_interval
, defina várias tarefas em um único DAG. Caso contrário, para diminuir a repetição de código, é possível gerar muitos DAGs em um único módulo Python. Basta colocá-los nos globals()
do módulo.
Como limitar o número de tarefas simultâneas em execução em um DAG?
Por exemplo, quero evitar exceder os limites/cotas de uso da API ou impedir a execução de muitos processos simultâneos.
É possível definir pools do Airflow na interface da Web do Airflow e associar tarefas a pools atuais nos DAGs.
Perguntas frequentes sobre o uso de operadores
Devo usar DockerOperator
?
Não recomendamos o uso de
o DockerOperator
, a menos que seja usado para iniciar
contêineres em uma instalação remota do Docker (não em um ambiente
cluster). Em um ambiente do Cloud Composer, o operador não tem acesso aos daemons do Docker.
Em vez disso, use KubernetesPodOperator
ou
GKEStartPodOperator
. Esses operadores iniciam pods do Kubernetes em
clusters do Kubernetes ou do GKE, respectivamente. Não recomendamos
iniciar pods no cluster de um ambiente, porque isso pode levar
à concorrência de recursos.
Devo usar SubDagOperator
?
Não recomendamos o uso de SubDagOperator
.
Use alternativas como sugerido em Como agrupar tarefas.
Posso executar o código do Python somente em PythonOperators
para separar completamente os operadores Python?
Dependendo do seu objetivo, você tem algumas opções.
Se a única preocupação for manter dependências separadas do Python, use PythonVirtualenvOperator
.
Considere usar KubernetesPodOperator
. Com esse operador,
para definir pods do Kubernetes e executá-los em outros clusters.
Como adiciono pacotes personalizados não PyPI ou binários?
É possível instalar pacotes hospedados em repositórios de pacotes particulares.
Como faço para transmitir argumentos uniformemente para um DAG e as tarefas dele?
Use o suporte integrado do Airflow para modelos Jinja para transmitir argumentos que podem ser usados em campos com modelo.
Quando ocorre a substituição do modelo?
A substituição de modelo ocorre nos workers do Airflow pouco antes da função pre_execute
de um operador ser chamada. Na prática, isso significa que os modelos são
não substituído até pouco antes da execução de uma tarefa.
Como sei quais argumentos do operador são compatíveis com a substituição do modelo?
Os argumentos do operador que são compatíveis com a substituição do modelo Jinja2 são explicitamente marcado como tal.
Procure o campo template_fields
na definição do operador,
que contém uma lista de nomes de argumentos que passam por substituição de modelo.
Por exemplo, consulte
o BashOperator
, que é compatível com modelos para
os argumentos bash_command
e env
.
A seguir
- Como resolver problemas de DAGs
- Solução de problemas do Scheduler
- Operadores do Google
- Operadores do Google Cloud
- Tutorial do Apache Airflow (em inglês)