Como adicionar e atualizar DAGs (fluxos de trabalho)

Esta página descreve como determinar o intervalo de armazenamento para o ambiente e como adicionar, atualizar e excluir um DAG do ambiente.

O Cloud Composer usa o Cloud Storage para armazenar DAGs do Apache Airflow, também conhecidos como fluxos de trabalho. Cada ambiente tem um intervalo associado do Cloud Storage. O Cloud Composer programa somente os DAGs no intervalo do Cloud Storage.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • As permissões a seguir são necessárias para adicionar e atualizar plug-ins no intervalo do Cloud Storage do ambiente do Cloud Composer:
    • storage.objectAdmin para fazer upload de arquivos.
    • composer.environments.get para pesquisar o intervalo de destino do DAG. Essa permissão não é necessária ao usar a API Cloud Storage ou gsutil.
  • As alterações no DAG levam de 3 a 5 minutos para serem feitas. É possível ver o status da tarefa na interface da Web do Airflow.

Como determinar o nome do intervalo de armazenamento

Para determinar o nome do intervalo de armazenamento associado ao ambiente:

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do intervalo do Cloud Storage é mostrado à direita da pasta DAGs.

  3. (Opcional) Para ver o intervalo no Cloud Storage, clique no nome do intervalo.

gcloud

Digite o seguinte comando:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="get(config.dagGcsPrefix)"
    

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • --format é uma opção para especificar apenas a propriedade dagGcsPrefix, em vez de todos os detalhes do ambiente.

A propriedade dagGcsPrefix mostra o nome do intervalo:

gs://region-environment_name-random_id-bucket/
    

rest

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método projects.locations.environments.get.
  2. Leia config.dagGcsPrefix na resposta Ambiente.

rpc

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método Environments.GetEnvironment.
  2. Leia o config.dag_gcs_prefix na resposta Ambiente.

python

Use a biblioteca google-auth para acessar as credenciais e usar a biblioteca requests para chamar a API REST.

import google.auth
    import google.auth.transport.requests

    # Authenticate with Google Cloud.
    # See: https://cloud.google.com/docs/authentication/getting-started
    credentials, _ = google.auth.default(
        scopes=['https://www.googleapis.com/auth/cloud-platform'])
    authed_session = google.auth.transport.requests.AuthorizedSession(
        credentials)

    # project_id = 'YOUR_PROJECT_ID'
    # location = 'us-central1'
    # composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

    environment_url = (
        'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
        '/environments/{}').format(project_id, location, composer_environment)
    response = authed_session.request('GET', environment_url)
    environment_data = response.json()

    # Print the bucket name from the response body.
    print(environment_data['config']['dagGcsPrefix'])

Como adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py Python para o DAG para a pasta dags do ambiente no Cloud Storage.

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do intervalo do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o intervalo no Cloud Storage, clique no nome do intervalo.

    Por padrão, a pasta dags é aberta.

  4. Clique em Fazer upload de arquivos e selecione a cópia local do DAG que você quer enviar.

  5. Para fazer o upload do arquivo para a pasta dags, clique em Abrir.

gcloud

Para adicionar ou atualizar um DAG:

    gcloud composer environments storage dags import \
        --environment ENVIRONMENT_NAME \
        --location LOCATION \
        --source LOCAL_FILE_TO_UPLOAD
    

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o DAG para upload.

Como excluir um DAG

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python relativo ao DAG da pasta dags do ambiente no Cloud Storage. A exclusão de um DAG não remove os metadados do DAG da interface da Web do Airflow.

Console

  1. Acesse a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do intervalo do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o intervalo no Cloud Storage, clique no nome do intervalo.

    Por padrão, a pasta dags é aberta.

  4. Clique na caixa de seleção ao lado do DAG que você quer excluir.

  5. Na parte superior do Console do Cloud, clique em Excluir.

  6. Na caixa de diálogo exibida, clique em OK.

gcloud

    gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME.py é o DAG a ser excluído.
  • Airflow 1.9.0: os metadados dos DAGs excluídos permanecem visíveis na interface da Web do Airflow.

  • Airflow 1.10.0 ou posterior: é possível usar a ferramenta gcloud para remover os metadados do DAG.

Como remover um DAG da interface da Web do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow, insira:

      gcloud composer environments run --location LOCATION \
      ENVIRONMENT_NAME delete_dag -- DAG_NAME
    

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir