[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Translation issue"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]
Como escrever DAGs (fluxos de trabalho)
Neste guia, você aprende a escrever um gráfico acíclico direcionado (DAG, na sigla em inglês) do Apache Airflow, que é executado em um ambiente do Cloud Composer.
Como estruturar um DAG
Um DAG do Airflow é definido em um arquivo Python e composto pelos seguintes elementos: definição, operadores e relações de operador. Nos snippets de código a seguir, você vê exemplos de cada elemento fora de contexto:
import datetime
from airflow import models
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Os operadores, para descrever o trabalho a ser feito. A instanciação de um operador é chamada de tarefa (em inglês).
from airflow.operators import bash_operator
from airflow.operators import python_operator
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
As relações de operador, para descrever a ordem em que o trabalho será concluído.
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
O fluxo de trabalho a seguir é um exemplo completo de trabalho e é composto por duas tarefas: uma tarefa hello_python e uma goodbye_bash:
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
Nos exemplos a seguir, você vê alguns operadores conhecidos do Airflow. Para ter uma referência autoritativa deles, consulte a referência da API Apache Airflow ou veja o código-fonte dos operadores core e contrib (links em inglês).
BashOperator
Use o BashOperator para executar programas da linha de comando.
from airflow.operators import bash_operator
# Create BigQuery output dataset.
make_bq_dataset = bash_operator.BashOperator(
task_id='make_bq_dataset',
# Executing 'bq' command requires Google Cloud SDK which comes
# preinstalled in Cloud Composer.
bash_command='bq ls {} || bq mk {}'.format(
bq_dataset_name, bq_dataset_name))
O Cloud Composer executa os comandos fornecidos em um script "Bash" em um worker.
O worker é um contêiner do Docker baseado no Debian e inclui vários pacotes.
Use o PythonOperator para executar o código arbitrário do Python. O Cloud Composer executa o código Python em um contêiner que inclui os seguintes pacotes:
from airflow.operators import email_operator
# Send email confirmation
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=models.Variable.get('email'),
subject='Sample BigQuery notify data ready',
html_content="""
Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
12AM. The most popular question was '{question_title}' with
{view_count} views. Top 100 questions asked are now available at:
{export_location}.
""".format(
min_date=min_query_date,
max_date=max_query_date,
question_title=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][0] }}'
),
view_count=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][1] }}'
),
export_location=output_file))
Notificações
Defina email_on_failure como True para enviar uma notificação por e-mail quando um operador no DAG falhar. Para enviar notificações por e-mail de um ambiente do Cloud Composer, você precisa configurar o ambiente para usar o SendGrid.
from airflow import models
default_dag_args = {
'start_date': yesterday,
# Email whenever an Operator in the DAG fails.
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
Diretrizes
Coloque as bibliotecas Python personalizadas em um arquivo ZIP do DAG em um diretório aninhado. Não inclua bibliotecas no nível superior do diretório de DAGs.
Quando o Airflow verifica a pasta dags/, o Airflow verifica somente DAGs nos módulos Python que estão no nível superior da pasta DAGs e no nível superior de um arquivo ZIP localizado também na pasta de nível superior dags/. Se o Airflow encontrar um módulo Python em um arquivo ZIP que não contenha substrings airflow e DAG, o Airflow interromperá o processamento do arquivo ZIP. O Airflow retorna apenas os DAGs encontrados até esse ponto.
Para ter tolerância a falhas, não defina vários objetos de DAG no mesmo módulo do Python.
Não defina subDAGs como objetos de nível superior.
Em geral, o Airflow pega objetos DAG no namespace global de um módulo no diretório dags/ como DAGs de nível superior. Todos os subDAGs definidos como objetos de nível superior são executados de acordo com as próprias programações, além dos agendamentos de outros DAGs que incorporam os subDAGs.
Coloque arquivos que são necessários no tempo de análise do DAG no diretório dags/ que não está no diretório data/. O diretório data/ não está montado no servidor da Web.
FAQs
Como criar DAGs
Como diminuo a repetição de código se eu quiser executar tarefas iguais ou semelhantes em vários DAGs?
Recomendamos que você defina bibliotecas e wrappers para diminuir a repetição de código.
Como faço para reutilizar código entre arquivos de DAGs?
Coloque as funções de utilitário em uma biblioteca local do Python e as importe. É possível referenciar as funções em qualquer DAG na pasta dags/.
Como diminuo o risco de surgirem diferentes definições?
Por exemplo, tenho duas equipes que querem agregar dados brutos em métricas de receita. Elas escrevem duas tarefas ligeiramente diferentes que fazem a mesma coisa.
Defina bibliotecas para trabalhar com os dados de receita. Assim, os implementadores de DAGs precisam esclarecer a definição da receita que está sendo agregada.
Como faço para definir dependências entre DAGs?
Isso depende de como você quer definir a dependência.
Se você tem dois DAGs (DAG A e DAG B) e quer que o DAG B seja acionado após o DAG A, é possível colocar um TriggerDagRunOperator no final do Dag A.
Se o DAG B depender apenas de um artefato gerado pelo A, como uma mensagem do Pub/Sub, um sensor funcionará melhor.
Se o DAG B estiver muito integrado ao A, talvez seja possível mesclar os dois em um único DAG.
Como transmito códigos exclusivos de execução para um DAG e às tarefas dele?
Por exemplo, quero transmitir caminhos de arquivo e nomes de cluster do Dataproc.
É possível gerar um ID exclusivo aleatório retornando str(uuid.uuid4()) em PythonOperator. Isso coloca o ID em xcoms para que você possa consultá-lo em outros operadores por meio de campos com modelo.
Antes de gerar um uuid, considere se um ID específico do DagRun seria mais importante. Também é possível mencionar esses IDs nas substituições do Jinja por meio de macros.
Que limites estabeleço entre as tarefas em um DAG?
Cada tarefa precisa ser uma unidade de trabalho idempotente. Consequentemente, evite encapsular um fluxo de trabalho de várias etapas em uma única tarefa, como um programa complexo em execução em um PythonOperator.
O mecanismo nativo do fluxo de ar para reutilizar o código de definição de fluxo de trabalho é SubDagOperator. No entanto, há restrições. Para informações, consulte Devo usar SubDagOperator.
É recomendado definir várias tarefas em um único DAG para agregar dados de várias origens?
Por exemplo, tenho várias tabelas com dados brutos e quero criar agregações diárias para cada uma delas. As tarefas não são dependentes entre si. Preciso criar uma tarefa e um DAG para cada tabela ou gerar um DAG geral?
Se você concorda que cada tarefa compartilhe as mesmas propriedades no nível do DAG, como schedule_interval, defina várias tarefas em um único DAG. Caso contrário, para minimizar a repetição de código, vários DAGs podem ser gerados de um único módulo Python, colocando-os nos globais do módulo (em inglês).
Como limitar o número de tarefas simultâneas em execução em um DAG?
Por exemplo, eu quero evitar exceder os limites de uso da API/cotas ou evitar a execução de muitos processos simultâneos.
É possível definir pools do Airflow na interface de usuário da Web do Airflow e associar tarefas a pools atuais nos DAGs.
Há algum exemplo de uso de modelos para gerar DAGs?
Não recomendamos usar o DockerOperator, a menos que ele seja usado para iniciar contêineres em uma instalação remota do Docker (ou seja, em nenhum lugar do cluster do GKE de um ambiente). O Composer não monta cada daemon Docker de cada servidor GKE em cada worker do Airflow, de modo que o operador não terá acesso aos daemons do Docker, a menos que um usuário os instale manualmente.
Como alternativa, recomendamos o uso de KubernetesPodOperator ou GKEPodOperator, que podem iniciar os pods do Kubernetes nos clusters do Kubernetes ou do GKE, respectivamente. Não recomendamos o lançamento de pods no cluster do GKE de um ambiente, já que isso pode levar à concorrência de recursos.
Não recomendamos o uso de SubDagOperator.
Ainda que o SubDagOperator possa fornecer o encapsulamento, as tarefas SubDag precisam de um espaço de tarefas. Se o worker que estiver executando essa tarefa falhar, todas as tarefas no SubDag falharão, gerando fluxos de trabalho não confiáveis.
É recomendado executar o código do Python somente em DockerOperators para separar completamente os operadores Python?
Dependendo do seu objetivo, você tem algumas opções.
Se sua única preocupação é manter dependências separadas do Python, use o PythonVirtualenvOperator.
Embora você possa usar DockerOperator, considere também usar KubernetesPodOperator, que permite que você defina pods do Kubernetes e execute os pods em outros clusters.
Como usar o KubernetesPodOperator fora do Google Cloud?
É possível montar um arquivo de configuração que especifique como autenticar com o cluster do GKE e colocar o arquivo na pasta /data.
Essa pasta é ativada no ambiente do Cloud Composer.
Como adiciono pacotes personalizados não PyPI ou binários?
A substituição de modelo ocorre nos workers pouco antes da função pre_execute de um operador ser chamada. Na prática, isso significa que os modelos não são substituídos até um pouco antes de uma tarefa ser executada.
Como sei quais argumentos do operador são compatíveis com a substituição do modelo?
Os argumentos que são compatíveis com a substituição do modelo Jinja2 são explicitamente marcados como tal no código-fonte do operador.
Procure o campo template_fields na definição "Operator", que contém uma lista de nomes de argumentos que passarão por substituição de modelo. Por exemplo, consulte o BashOperator, que oferece suporte aos modelos para os argumentos bash_command e env.