Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este guia mostra como escrever um gráfico acíclico dirigido (DAG) do Apache Airflow que é executado num ambiente do Cloud Composer.
Uma vez que o Apache Airflow não oferece um forte isolamento de DAGs e tarefas, recomendamos que use ambientes de produção e de teste separados para evitar interferências de DAGs. Para mais informações, consulte o artigo Testar DAGs.
Estruturar um DAG do Airflow
Um DAG do Airflow é definido num ficheiro Python e é composto pelos seguintes componentes:
- Definição do DAG
- Operadores do Airflow
- Relações com operadores
Os seguintes fragmentos de código mostram exemplos de cada componente fora do contexto.
Uma definição de DAG
O exemplo seguinte demonstra uma definição de DAG do Airflow:
Operadores e tarefas
Os operadores do Airflow descrevem o trabalho a realizar. Uma tarefa task é uma instância específica de um operador.
Relações de tarefas
As relações entre tarefas descrevem a ordem pela qual o trabalho tem de ser concluído.
Exemplo de fluxo de trabalho DAG completo em Python
O fluxo de trabalho seguinte é um modelo DAG funcional completo composto por duas tarefas: uma tarefa hello_python
e uma tarefa goodbye_bash
:
Para mais informações sobre a definição de DAGs do Airflow, consulte o tutorial do Airflow e os conceitos do Airflow.
Operadores do Airflow
Os exemplos seguintes mostram alguns operadores populares do Airflow. Para uma referência autorizada dos operadores do Airflow, consulte a referência de operadores e hooks e o índice de fornecedores.
BashOperator
Use o BashOperator para executar programas de linha de comandos.
O Cloud Composer executa os comandos fornecidos num script Bash num worker do Airflow. O trabalhador é um contentor Docker baseado no Debian e inclui vários pacotes.
gcloud
, incluindo ogcloud storage
subcomando para trabalhar com contentores do Cloud Storage.bq
comandokubectl
comando
PythonOperator
Use o PythonOperator para executar código Python arbitrário.
O Cloud Composer executa o código Python num contentor que inclui pacotes para a versão da imagem do Cloud Composer usada no seu ambiente.
Para instalar pacotes Python adicionais, consulte o artigo Instalar dependências do Python.
Google Cloud Operadores
Para executar tarefas que usam produtos Google Cloud , use os operadores doGoogle Cloud Airflow. Por exemplo, os operadores do BigQuery consultam e processam dados no BigQuery.
Existem muitos mais operadores do Airflow para o Google Cloud e serviços individuais fornecidos pela Google Cloud. Consulte a secção Google Cloud Operadores para ver a lista completa.
EmailOperator
Use o EmailOperator para enviar emails a partir de um DAG. Para enviar emails a partir de um ambiente do Cloud Composer, configure o seu ambiente para usar o SendGrid.
Notificações sobre falhas do operador
Defina email_on_failure
como True
para enviar uma notificação por email quando um operador
no DAG falhar. Para enviar notificações por email a partir de um ambiente do Cloud Composer, tem de configurar o seu ambiente para usar o SendGrid.
Diretrizes do fluxo de trabalho de DAG
Coloque quaisquer bibliotecas Python personalizadas num arquivo ZIP de um DAG num diretório aninhado. Não coloque bibliotecas no nível superior do diretório DAGs.
Quando o Airflow analisa a pasta
dags/
, verifica apenas os DAGs nos módulos Python que se encontram no nível superior da pasta DAGs e no nível superior de um arquivo ZIP também localizado na pastadags/
de nível superior. Se o Airflow encontrar um módulo Python num arquivo ZIP que não contenha as subcadeiasairflow
eDAG
, o Airflow para de processar o arquivo ZIP. O Airflow devolve apenas os DAGs encontrados até esse ponto.Para tolerância a falhas, não defina vários objetos DAG no mesmo módulo Python.
Não use SubDAGs. Em alternativa, agrupe as tarefas em DAGs.
Coloque os ficheiros necessários no momento da análise DAG na pasta
dags/
e não na pastadata/
.Teste os DAGs desenvolvidos ou modificados, conforme recomendado nas instruções para testar DAGs.
Verifique se os DAGs desenvolvidos não aumentam demasiado os tempos de análise dos DAGs.
As tarefas do Airflow podem falhar por vários motivos. Para evitar falhas de execuções de DAGs completas, recomendamos que ative as novas tentativas de tarefas. Se definir o número máximo de tentativas como
0
, não são feitas tentativas.Recomendamos que substitua a opção
default_task_retries
por um valor para as novas tentativas de tarefas diferente de0
. Além disso, pode definir o parâmetroretries
ao nível da tarefa.Se quiser usar a GPU nas suas tarefas do Airflow, crie um cluster do GKE separado com base em nós que usem máquinas com GPUs. Use o GKEStartPodOperator para executar as suas tarefas.
Evite executar tarefas com utilização intensiva da CPU e da memória no conjunto de nós do cluster onde estão a ser executados outros componentes do Airflow (programadores, trabalhadores, servidores Web). Em alternativa, use KubernetesPodOperator ou GKEStartPodOperator.
Quando implementar DAGs num ambiente, carregue apenas os ficheiros que são absolutamente necessários para interpretar e executar DAGs na pasta
/dags
.Limite o número de ficheiros DAG na pasta
/dags
.O Airflow está a analisar continuamente os DAGs na pasta
/dags
. A análise é um processo que percorre a pasta DAGs e o número de ficheiros que têm de ser carregados (com as respetivas dependências) afeta o desempenho da análise DAG e do agendamento de tarefas. É muito mais eficiente usar 100 ficheiros com 100 DAGs cada do que 10 000 ficheiros com 1 DAG cada. Por isso, recomenda-se esta otimização. Esta otimização é um equilíbrio entre o tempo de análise e a eficiência da criação e gestão de DAGs.Também pode considerar, por exemplo, implementar 10 000 ficheiros DAG. Para tal, pode criar 100 ficheiros ZIP, cada um com 100 ficheiros DAG.
Além das sugestões acima, se tiver mais de 10 000 ficheiros DAG, a geração de DAGs de forma programática pode ser uma boa opção. Por exemplo, pode implementar um único ficheiro DAG Python que gera um determinado número de objetos DAG (por exemplo, 20 ou 100 objetos DAG).
Evite usar operadores do Airflow descontinuados. Em alternativa, use as respetivas alternativas atualizadas.
Perguntas frequentes sobre a escrita de DAGs
Como posso minimizar a repetição de código se quiser executar tarefas iguais ou semelhantes em vários DAGs?
Sugerimos que defina bibliotecas e wrappers para minimizar a repetição de código.
Como posso reutilizar código entre ficheiros DAG?
Coloque as funções de utilidade numa
biblioteca Python local
e importe as funções. Pode fazer referência às funções em qualquer DAG localizado na pasta dags/
no contentor do seu ambiente.
Como posso minimizar o risco de surgirem definições diferentes?
Por exemplo, tem duas equipas que querem agregar dados não processados em métricas de receita. As equipas escrevem duas tarefas ligeiramente diferentes que fazem a mesma coisa. Defina bibliotecas para trabalhar com os dados de receita, de modo que os implementadores do DAG tenham de esclarecer a definição da receita que está a ser agregada.
Como posso definir dependências entre DAGs?
Isto depende da forma como quer definir a dependência.
Se tiver dois DAGs (DAG A e DAG B) e quiser que o DAG B seja acionado após o DAG A, pode colocar um TriggerDagRunOperator
no final do DAG A.
Se o DAG B depender apenas de um artefacto gerado pelo DAG A, como uma mensagem do Pub/Sub, um sensor pode funcionar melhor.
Se o DAG B estiver estreitamente integrado com o DAG A, pode conseguir unir os dois DAGs num único DAG.
Como posso transmitir IDs de execução exclusivos a um DAG e às respetivas tarefas?
Por exemplo, quer transmitir nomes de clusters do Dataproc e caminhos de ficheiros.
Pode gerar um ID exclusivo aleatório devolvendo str(uuid.uuid4())
num PythonOperator
. Isto coloca o ID em XComs
para que possa consultar o ID noutros operadores através de campos baseados em modelos.
Antes de gerar um uuid
, pondere se um ID específico do DagRun seria mais valioso. Também pode fazer referência a estes IDs em substituições Jinja usando macros.
Como posso separar tarefas num DAG?
Cada tarefa deve ser uma unidade de trabalho idempotente. Consequentemente, deve evitar encapsular um fluxo de trabalho de vários passos numa única tarefa, como um programa complexo em execução num PythonOperator
.
Devo definir várias tarefas num único DAG para agregar dados de várias origens?
Por exemplo, tem várias tabelas com dados não processados e quer criar agregados diários para cada tabela. As tarefas não dependem umas das outras. Deve criar uma tarefa e um DAG para cada tabela ou criar um DAG geral?
Se não se importar que cada tarefa partilhe as mesmas propriedades ao nível do DAG, como
schedule_interval
, faz sentido definir várias tarefas num único
DAG. Caso contrário, para minimizar a repetição de código, podem ser gerados vários DAGs a partir de um único módulo Python, colocando-os no globals()
do módulo.
Como posso limitar o número de tarefas simultâneas em execução num DAG?
Por exemplo, quer evitar exceder os limites/quotas de utilização da API ou evitar a execução de demasiados processos simultâneos.
Pode definir pools do Airflow na IU Web do Airflow e associar tarefas a pools existentes nos seus DAGs.
Perguntas frequentes sobre a utilização de operadores
Devo usar o DockerOperator
?
Não recomendamos a utilização do comando
DockerOperator
, a menos que seja usado para iniciar
contentores numa instalação remota do Docker (não dentro do cluster de um ambiente). Num ambiente do Cloud Composer, o operador não tem acesso a daemons do Docker.
Em alternativa, use KubernetesPodOperator
ou
GKEStartPodOperator
. Estes operadores iniciam pods do Kubernetes em clusters do Kubernetes ou do GKE, respetivamente. Tenha em atenção que não
recomendamos o lançamento de pods no cluster de um ambiente, porque isto pode levar
a concorrência de recursos.
Devo usar o SubDagOperator
?
Não recomendamos a utilização do SubDagOperator
.
Use alternativas, conforme sugerido em Agrupar tarefas.
Devo executar código Python apenas em PythonOperators
para separar totalmente os operadores Python?
Consoante o seu objetivo, tem algumas opções.
Se a sua única preocupação for manter dependências Python separadas, pode usar o comando PythonVirtualenvOperator
.
Considere usar a KubernetesPodOperator
. Este operador permite-lhe definir pods do Kubernetes e executar os pods noutros clusters.
Como posso adicionar pacotes binários personalizados ou não pertencentes ao PyPI?
Pode instalar pacotes alojados em repositórios de pacotes privados.
Como posso transmitir argumentos de forma uniforme a um DAG e às respetivas tarefas?
Pode usar o suporte integrado do Airflow para a criação de modelos Jinja para transmitir argumentos que podem ser usados em campos com modelos.
Quando ocorre a substituição de modelos?
A substituição de modelos ocorre nos trabalhadores do Airflow imediatamente antes de a função pre_execute
de um operador ser chamada. Na prática, isto significa que os modelos não são substituídos até pouco antes da execução de uma tarefa.
Como posso saber que argumentos de operador suportam a substituição de modelos?
Os argumentos dos operadores que suportam a substituição de modelos Jinja2 estão explicitamente marcados como tal.
Procure o campo template_fields
na definição do operador,
que contém uma lista de nomes de argumentos que são substituídos por modelos.
Por exemplo, consulte o elemento BashOperator
, que suporta a utilização de modelos para os argumentos bash_command
e env
.
Operadores do Airflow descontinuados e removidos
Os operadores do Airflow indicados na tabela seguinte estão descontinuados:
Evite usar estes operadores nos seus DAGs. Em alternativa, use os operadores de substituição atualizados fornecidos.
Se um operador estiver listado como removido, já ficou indisponível numa das compilações do Airflow lançadas no Cloud Composer 3.
Se um operador estiver listado como planeado para remoção, significa que foi descontinuado e será removido numa compilação futura do Airflow.
Se um operador estiver listado como já removido nos fornecedores Google mais recentes, o operador é removido na versão mais recente do pacote
apache-airflow-providers-google
. Ao mesmo tempo, o Cloud Composer continua a usar a versão deste pacote em que o operador ainda não foi removido.
Operador descontinuado | Estado | Operador de substituição | Substituição disponível a partir de |
---|---|---|---|
CreateAutoMLTextTrainingJobOperator | Removido | SupervisedFineTuningTrainOperator |
composer-3-airflow-2.9.3-build.1 composer-3-airflow-2.9.1-build.8 |
GKEDeploymentHook | Removido | GKEKubernetesHook |
Todas as versões |
GKECustomResourceHook | Removido | GKEKubernetesHook |
Todas as versões |
GKEPodHook | Removido | GKEKubernetesHook |
Todas as versões |
GKEJobHook | Removido | GKEKubernetesHook |
Todas as versões |
GKEPodAsyncHook | Removido | GKEKubernetesAsyncHook |
Todas as versões |
SecretsManagerHook | Removido | GoogleCloudSecretManagerHook |
composer-3-airflow-2.7.3-build.6 |
BigQueryExecuteQueryOperator | Removido | BigQueryInsertJobOperator |
Todas as versões |
BigQueryPatchDatasetOperator | Removido | BigQueryUpdateDatasetOperator |
Todas as versões |
DataflowCreateJavaJobOperator | Removido | beam.BeamRunJavaPipelineOperator |
Todas as versões |
DataflowCreatePythonJobOperator | Removido | beam.BeamRunPythonPipelineOperator |
Todas as versões |
DataprocSubmitPigJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
DataprocSubmitHiveJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
DataprocSubmitSparkSqlJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
DataprocSubmitSparkJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
DataprocSubmitHadoopJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
DataprocSubmitPySparkJobOperator | Removido | DataprocSubmitJobOperator |
Todas as versões |
BigQueryTableExistenceAsyncSensor | Removido | BigQueryTableExistenceSensor |
Todas as versões |
BigQueryTableExistencePartitionAsyncSensor | Removido | BigQueryTablePartitionExistenceSensor |
Todas as versões |
CloudComposerEnvironmentSensor | Removido | CloudComposerCreateEnvironmentOperator, CloudComposerDeleteEnvironmentOperator, CloudComposerUpdateEnvironmentOperator |
Todas as versões |
GCSObjectExistenceAsyncSensor | Removido | GCSObjectExistenceSensor |
Todas as versões |
GoogleAnalyticsHook | Removido | GoogleAnalyticsAdminHook |
Todas as versões |
GoogleAnalyticsListAccountsOperator | Removido | GoogleAnalyticsAdminListAccountsOperator |
Todas as versões |
GoogleAnalyticsGetAdsLinkOperator | Removido | GoogleAnalyticsAdminGetGoogleAdsLinkOperator |
Todas as versões |
GoogleAnalyticsRetrieveAdsLinksListOperator | Removido | GoogleAnalyticsAdminListGoogleAdsLinksOperator |
Todas as versões |
GoogleAnalyticsDataImportUploadOperator | Removido | GoogleAnalyticsAdminCreateDataStreamOperator |
Todas as versões |
GoogleAnalyticsDeletePreviousDataUploadsOperator | Removido | GoogleAnalyticsAdminDeleteDataStreamOperator |
Todas as versões |
DataPipelineHook | Removido | DataflowHook |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
CreateDataPipelineOperator | Removido | DataflowCreatePipelineOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
RunDataPipelineOperator | Removido | DataflowRunPipelineOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLDatasetLink | Descontinuação, remoção planeada | TranslationLegacyDatasetLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLDatasetListLink | Descontinuação, remoção planeada | TranslationDatasetListLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelLink | Descontinuação, remoção planeada | TranslationLegacyModelLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelTrainLink | Descontinuação, remoção planeada | TranslationLegacyModelTrainLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLModelPredictLink | Descontinuação, remoção planeada | TranslationLegacyModelPredictLink |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
AutoMLBatchPredictOperator | Removido | vertex_ai.batch_prediction_job |
composer-3-airflow-2.9.3-build.4 |
AutoMLPredictOperator | Descontinuação, remoção planeada | vertex_aigenerative_model. TextGenerationModelPredictOperator, translate.TranslateTextOperator |
composer-3-airflow-2.7.3-build.6 |
PromptLanguageModelOperator | Removido | TextGenerationModelPredictOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
GenerateTextEmbeddingsOperator | Removido | TextEmbeddingModelGetEmbeddingsOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
PromptMultimodalModelOperator | Removido | GenerativeModelGenerateContentOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
PromptMultimodalModelWithMediaOperator | Removido | GenerativeModelGenerateContentOperator |
composer-3-airflow-2.9.1-build.0 composer-3-airflow-2.7.3-build.9 |
DataflowStartSqlJobOperator | Removido | DataflowStartYamlJobOperator |
composer-3-airflow-2.9.3-build.1 composer-3-airflow-2.9.1-build.8 |
LifeSciencesHook | Descontinuação, remoção planeada | Hook dos operadores do Google Cloud Batch |
A anunciar |
DataprocScaleClusterOperator | Descontinuação, remoção planeada | DataprocUpdateClusterOperator |
A anunciar |
MLEngineStartBatchPredictionJobOperator | Descontinuação, remoção planeada | CreateBatchPredictionJobOperator |
A anunciar |
MLEngineManageModelOperator | Descontinuação, remoção planeada | MLEngineCreateModelOperator, MLEngineGetModelOperator |
A anunciar |
MLEngineGetModelOperator | Descontinuação, remoção planeada | GetModelOperator |
A anunciar |
MLEngineDeleteModelOperator | Descontinuação, remoção planeada | DeleteModelOperator |
A anunciar |
MLEngineManageVersionOperator | Descontinuação, remoção planeada | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
A anunciar |
MLEngineCreateVersionOperator | Descontinuação, remoção planeada | Parâmetro parent_model para operadores do VertexAI |
A anunciar |
MLEngineSetDefaultVersionOperator | Descontinuação, remoção planeada | SetDefaultVersionOnModelOperator |
A anunciar |
MLEngineListVersionsOperator | Descontinuação, remoção planeada | ListModelVersionsOperator |
A anunciar |
MLEngineDeleteVersionOperator | Descontinuação, remoção planeada | DeleteModelVersionOperator |
A anunciar |
MLEngineStartTrainingJobOperator | Descontinuação, remoção planeada | CreateCustomPythonPackageTrainingJobOperator |
A anunciar |
MLEngineTrainingCancelJobOperator | Descontinuação, remoção planeada | CancelCustomTrainingJobOperator |
A anunciar |
LifeSciencesRunPipelineOperator | Descontinuação, remoção planeada | Operadores do Google Cloud Batch |
A anunciar |
MLEngineCreateModelOperator | Descontinuação, remoção planeada | operador do VertexAI correspondente |
A anunciar |
O que se segue?
- Resolução de problemas de DAGs
- Resolução de problemas do agendador
- Operadores Google
- Google Cloud Operadores
- Tutorial do Apache Airflow