Execute um DAG do Apache Airflow no Cloud Composer 1 (CLI do Google Cloud)
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este guia de início rápido mostra como criar um ambiente do Cloud Composer e executar um DAG do Apache Airflow no Cloud Composer 1.
Se não conhece o Airflow, consulte o tutorial de conceitos do Airflow na documentação do Apache Airflow para obter mais informações sobre os conceitos, os objetos e a respetiva utilização do Airflow.
Se quiser usar Google Cloud a consola, consulte o artigo Execute um DAG do Apache Airflow no Cloud Composer.
Se quiser criar um ambiente com o Terraform, consulte o artigo Criar ambientes (Terraform).
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro, tem de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro, tem de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Para receber as autorizações de que precisa para concluir este início rápido, peça ao seu administrador que lhe conceda as seguintes funções da IAM no seu projeto:
-
Para ver, criar e gerir o ambiente do Cloud Composer:
-
Administrador de objetos de ambiente e armazenamento (
roles/composer.environmentAndStorageObjectAdmin
) -
Utilizador da conta de serviço (
roles/iam.serviceAccountUser
)
-
Administrador de objetos de ambiente e armazenamento (
-
Para ver registos:
Visualizador de registos (
roles/logging.viewer
)
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
-
Para ver, criar e gerir o ambiente do Cloud Composer:
Crie uma nova conta de serviço, conforme descrito na documentação de gestão de identidade e de acesso.
Atribuir-lhe uma função, conforme descrito na documentação da gestão de identidade e de acesso. A função necessária é Composer Worker (
composer.worker
).- Cria um DAG,
composer_sample_dag
. Este DAG é executado todos os dias. - Executa uma tarefa,
print_dag_run_conf
. A tarefa imprime a configuração da execução do DAG usando o operador bash. - Analisa o ficheiro DAG que carregou. Pode demorar alguns minutos até que o DAG fique disponível para o Airflow.
- Adiciona o DAG à lista de DAGs disponíveis.
- Executa o DAG de acordo com a agenda que forneceu no ficheiro DAG.
Aguarde cerca de cinco minutos para dar tempo ao Airflow de processar o ficheiro DAG que carregou anteriormente e concluir a primeira execução do DAG (explicada mais tarde).
Execute o seguinte comando na CLI Google Cloud. Este comando executa o comando da CLI do Airflow que lista os DAGs no seu ambiente.
dags list
gcloud composer environments run example-environment \ --location us-central1 \ dags list
Verifique se o
composer_quickstart
DAG está listado na saída do comando.Exemplo de resultado:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
airflow_monitoring
RUN_ID
com o valorrun_id
da saída do comandotasks states-for-dag-run
que executou anteriormente. Por exemplo,2024-02-17T15:38:38.969307+00:00
.Elimine o ambiente do Cloud Composer:
Na Google Cloud consola, aceda à página Ambientes.
Selecione
example-environment
e clique em Eliminar.Aguarde até que o ambiente seja eliminado.
Elimine o contentor do seu ambiente. A eliminação do ambiente do Cloud Composer não elimina o respetivo contentor.
Na Google Cloud consola, aceda à página Armazenamento > Navegador.
Selecione o contentor do ambiente e clique em Eliminar. Por exemplo, este contentor pode ter o nome
us-central1-example-environ-c1616fe8-bucket
.
Crie uma conta de serviço do ambiente
Quando cria um ambiente, especifica uma conta de serviço. Esta conta de serviço chama-se conta de serviço do ambiente. O seu ambiente usa esta conta de serviço para realizar a maioria das operações.
A conta de serviço do seu ambiente não é uma conta de utilizador. Uma conta de serviço é um tipo especial de conta usada por uma aplicação ou uma instância de máquina virtual (VM) e não por uma pessoa.
Para criar uma conta de serviço para o seu ambiente:
Crie um ambiente
Crie um novo ambiente denominado example-environment
na região us-central1
com a versão mais recente do Cloud Composer 1.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-1.20.12-airflow-1.10.15
Crie um ficheiro DAG
Um DAG do Airflow é uma coleção de tarefas organizadas que quer agendar e executar. Os DAGs são definidos em ficheiros Python padrão.
Este guia usa um exemplo de DAG do Airflow definido no ficheiro quickstart.py
.
O código Python neste ficheiro faz o seguinte:
Guarde uma cópia do ficheiro quickstart.py
no seu computador local:
Carregue o ficheiro DAG para o contentor do seu ambiente
Todos os ambientes do Cloud Composer têm um contentor do Cloud Storage associado. O Airflow no Cloud Composer agenda apenas DAGs localizados na pasta /dags
neste contentor.
Para agendar o seu DAG, carregue o ficheiro quickstart.py
a partir da sua máquina local para a pasta /dags
do ambiente:
Para carregar quickstart.py
com a CLI gcloud, execute o seguinte comando na pasta onde o ficheiro quickstart.py
está localizado:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Veja o DAG
Depois de carregar o ficheiro DAG, o Airflow faz o seguinte:
Verifique se o DAG é processado sem erros e está disponível no Airflow através da visualização na IU do DAG. A IU do DAG é a interface do Cloud Composer para ver informações do DAG na Google Cloud consola. O Cloud Composer também oferece acesso à IU do Airflow, que é uma interface Web do Airflow nativa.
Veja detalhes da execução do DAG
Uma única execução de um DAG é denominada execução de DAG. O Airflow executa imediatamente uma execução de DAG para o DAG de exemplo porque a data de início no ficheiro DAG está definida para ontem. Desta forma, o Airflow atualiza-se de acordo com a programação do DAG especificado.
O DAG de exemplo contém uma tarefa, print_dag_run_conf
, que executa o comando echo
na consola. Este comando produz metainformações sobre o DAG (identificador numérico da execução do DAG).
Execute o seguinte comando na CLI Google Cloud. Este comando lista as execuções de DAG
para o DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Exemplo de resultado:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
A CLI do Airflow não fornece um comando para ver os registos de tarefas. Pode usar outros métodos para ver registos de tarefas do Airflow: IU de DAGs do Cloud Composer, IU do Airflow ou Cloud Logging. Este guia mostra uma forma de consultar o Cloud Logging para ver registos de uma execução de DAG específica.
Execute o seguinte comando na CLI Google Cloud. Este comando lê registos do Cloud Logging para uma execução de DAG específica do DAG composer_quickstart
. O argumento
--format
formata o resultado para que apenas o texto da mensagem de registo
seja apresentado.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Substituir:
Exemplo de resultado:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Limpar
Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.
Elimine os recursos usados neste tutorial:
O que se segue?