Execute um DAG do Apache Airflow no Cloud Composer 1 (CLI do Google Cloud)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este guia de início rápido mostra como criar um ambiente do Cloud Composer e executar um DAG do Apache Airflow no Cloud Composer 1.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro, tem de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro, tem de iniciar sessão na CLI gcloud com a sua identidade federada.

  9. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. Para receber as autorizações de que precisa para concluir este início rápido, peça ao seu administrador que lhe conceda as seguintes funções da IAM no seu projeto:

    Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

    Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

  14. Crie uma conta de serviço do ambiente

    Quando cria um ambiente, especifica uma conta de serviço. Esta conta de serviço chama-se conta de serviço do ambiente. O seu ambiente usa esta conta de serviço para realizar a maioria das operações.

    A conta de serviço do seu ambiente não é uma conta de utilizador. Uma conta de serviço é um tipo especial de conta usada por uma aplicação ou uma instância de máquina virtual (VM) e não por uma pessoa.

    Para criar uma conta de serviço para o seu ambiente:

    1. Crie uma nova conta de serviço, conforme descrito na documentação de gestão de identidade e de acesso.

    2. Atribuir-lhe uma função, conforme descrito na documentação da gestão de identidade e de acesso. A função necessária é Composer Worker (composer.worker).

    Crie um ambiente

    Crie um novo ambiente denominado example-environment na região us-central1 com a versão mais recente do Cloud Composer 1.

    gcloud composer environments create example-environment \
        --location us-central1 \
        --image-version composer-1.20.12-airflow-1.10.15
    

    Crie um ficheiro DAG

    Um DAG do Airflow é uma coleção de tarefas organizadas que quer agendar e executar. Os DAGs são definidos em ficheiros Python padrão.

    Este guia usa um exemplo de DAG do Airflow definido no ficheiro quickstart.py. O código Python neste ficheiro faz o seguinte:

    1. Cria um DAG, composer_sample_dag. Este DAG é executado todos os dias.
    2. Executa uma tarefa, print_dag_run_conf. A tarefa imprime a configuração da execução do DAG usando o operador bash.

    Guarde uma cópia do ficheiro quickstart.py no seu computador local:

    import datetime
    
    from airflow import models
    from airflow.operators import bash
    
    # If you are running Airflow in more than one time zone
    # see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
    # for best practices
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        "owner": "Composer Example",
        "depends_on_past": False,
        "email": [""],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=5),
        "start_date": YESTERDAY,
    }
    
    with models.DAG(
        "composer_quickstart",
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:
        # Print the dag_run id from the Airflow logs
        print_dag_run_conf = bash.BashOperator(
            task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
        )

    Carregue o ficheiro DAG para o contentor do seu ambiente

    Todos os ambientes do Cloud Composer têm um contentor do Cloud Storage associado. O Airflow no Cloud Composer agenda apenas DAGs localizados na pasta /dags neste contentor.

    Para agendar o seu DAG, carregue o ficheiro quickstart.py a partir da sua máquina local para a pasta /dags do ambiente:

    Para carregar quickstart.py com a CLI gcloud, execute o seguinte comando na pasta onde o ficheiro quickstart.py está localizado:

    gcloud composer environments storage dags import \
    --environment example-environment --location us-central1 \
    --source quickstart.py
    

    Veja o DAG

    Depois de carregar o ficheiro DAG, o Airflow faz o seguinte:

    1. Analisa o ficheiro DAG que carregou. Pode demorar alguns minutos até que o DAG fique disponível para o Airflow.
    2. Adiciona o DAG à lista de DAGs disponíveis.
    3. Executa o DAG de acordo com a agenda que forneceu no ficheiro DAG.

    Verifique se o DAG é processado sem erros e está disponível no Airflow através da visualização na IU do DAG. A IU do DAG é a interface do Cloud Composer para ver informações do DAG na Google Cloud consola. O Cloud Composer também oferece acesso à IU do Airflow, que é uma interface Web do Airflow nativa.

    1. Aguarde cerca de cinco minutos para dar tempo ao Airflow de processar o ficheiro DAG que carregou anteriormente e concluir a primeira execução do DAG (explicada mais tarde).

    2. Execute o seguinte comando na CLI Google Cloud. Este comando executa o comando da CLI do Airflow que lista os DAGs no seu ambiente.dags list

      gcloud composer environments run example-environment \
      --location us-central1 \
      dags list
      
    3. Verifique se o composer_quickstart DAG está listado na saída do comando.

      Exemplo de resultado:

      Executing the command: [ airflow dags list ]...
      Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
      Use ctrl-c to interrupt the command
      dag_id              | filepath              | owner            | paused
      ====================+=======================+==================+=======
      airflow_monitoring  | airflow_monitoring.py | airflow          | False
      composer_quickstart | dag-quickstart-af2.py | Composer Example | False
      
      airflow_monitoring

    Veja detalhes da execução do DAG

    Uma única execução de um DAG é denominada execução de DAG. O Airflow executa imediatamente uma execução de DAG para o DAG de exemplo porque a data de início no ficheiro DAG está definida para ontem. Desta forma, o Airflow atualiza-se de acordo com a programação do DAG especificado.

    O DAG de exemplo contém uma tarefa, print_dag_run_conf, que executa o comando echo na consola. Este comando produz metainformações sobre o DAG (identificador numérico da execução do DAG).

    Execute o seguinte comando na CLI Google Cloud. Este comando lista as execuções de DAG para o DAG composer_quickstart:

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list-runs -- --dag-id composer_quickstart
    

    Exemplo de resultado:

    dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
    ====================+=============================================+=========+==================================+==================================+=================================
    composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
    

    A CLI do Airflow não fornece um comando para ver os registos de tarefas. Pode usar outros métodos para ver registos de tarefas do Airflow: IU de DAGs do Cloud Composer, IU do Airflow ou Cloud Logging. Este guia mostra uma forma de consultar o Cloud Logging para ver registos de uma execução de DAG específica.

    Execute o seguinte comando na CLI Google Cloud. Este comando lê registos do Cloud Logging para uma execução de DAG específica do DAG composer_quickstart. O argumento --format formata o resultado para que apenas o texto da mensagem de registo seja apresentado.

    gcloud logging read \
    --format="value(textPayload)" \
    --order=asc \
    "resource.type=cloud_composer_environment \
    resource.labels.location=us-central1 \
    resource.labels.environment_name=example-environment \
    labels.workflow=composer_quickstart \
    (labels.\"execution-date\"=\"RUN_ID\")"
    

    Substituir:

    • RUN_ID com o valor run_id da saída do comando tasks states-for-dag-run que executou anteriormente. Por exemplo, 2024-02-17T15:38:38.969307+00:00.

    Exemplo de resultado:

    ...
    
    Starting attempt 1 of 2
    Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
    15:38:38.969307+00:00
    Started process 22544 to run task
    
    ...
    
    Running command: ['/usr/bin/bash', '-c', 'echo 115746']
    Output:
    115746
    
    ...
    
    Command exited with return code 0
    Marking task as SUCCESS. dag_id=composer_quickstart,
    task_id=print_dag_run_conf, execution_date=20240217T153838,
    start_date=20240218T153841, end_date=20240218T153841
    Task exited with return code 0
    0 downstream tasks scheduled from follow-on schedule check
    

    Limpar

    Para evitar incorrer em custos na sua Google Cloud conta pelos recursos usados nesta página, elimine o Google Cloud projeto com os recursos.

    Elimine os recursos usados neste tutorial:

    1. Elimine o ambiente do Cloud Composer:

      1. Na Google Cloud consola, aceda à página Ambientes.

        Aceder a Ambientes

      2. Selecione example-environment e clique em Eliminar.

      3. Aguarde até que o ambiente seja eliminado.

    2. Elimine o contentor do seu ambiente. A eliminação do ambiente do Cloud Composer não elimina o respetivo contentor.

      1. Na Google Cloud consola, aceda à página Armazenamento > Navegador.

        Aceda a Armazenamento > Navegador

      2. Selecione o contentor do ambiente e clique em Eliminar. Por exemplo, este contentor pode ter o nome us-central1-example-environ-c1616fe8-bucket.

    O que se segue?