Como adicionar e atualizar DAGs (fluxos de trabalho)

Esta página descreve como determinar o bucket de armazenamento do ambiente e como adicionar, atualizar e excluir um DAG do ambiente.

O Cloud Composer usa o Cloud Storage para armazenar DAGs do Apache Airflow, também conhecidos como fluxos de trabalho. Cada ambiente tem um bucket associado do Cloud Storage. O Cloud Composer programa somente os DAGs no bucket do Cloud Storage.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • As permissões a seguir são necessárias para adicionar e atualizar plug-ins no bucket do Cloud Storage do ambiente do Cloud Composer:
    • storage.objectAdmin para fazer upload de arquivos.
    • composer.environments.get para pesquisar o bucket de destino do DAG. Essa permissão não é necessária ao usar a API Cloud Storage ou gsutil.
  • As alterações no DAG levam de 3 a 5 minutos para serem feitas. É possível ver o status da tarefa na interface da Web do Airflow.

Como determinar o nome do bucket de armazenamento

Para determinar o nome do bucket de armazenamento associado ao ambiente:

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. (Opcional) Para ver o bucket no Cloud Storage, clique no nome do bucket.

gcloud

Digite o seguinte comando:

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado.
  • --format é uma opção para especificar apenas a propriedade dagGcsPrefix, em vez de todos os detalhes do ambiente.

A propriedade dagGcsPrefix mostra o nome do bucket:

gs://region-environment_name-random_id-bucket/

rest

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método projects.locations.environments.get.
  2. Leia config.dagGcsPrefix da resposta Ambiente.

rpc

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método Environments.GetEnvironment.
  2. Leia o config.dag_gcs_prefix da resposta Ambiente.

python

Use a biblioteca google-auth para acessar as credenciais e usar a biblioteca solicitações para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Como adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo Python .py do DAG para a pasta dags do ambiente no Cloud Storage.

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o bucket no Cloud Storage, clique no nome do bucket.

    Por padrão, a pasta dags é aberta.

  4. Clique em Fazer upload de arquivos e selecione a cópia local do DAG que você quer enviar.

  5. Para fazer o upload do arquivo para a pasta dags, clique em Abrir.

gcloud

Para adicionar ou atualizar um DAG:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o DAG para upload.

Como excluir um DAG

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python relativo ao DAG da pasta dags do ambiente no Cloud Storage. A exclusão de um DAG não remove os metadados do DAG da interface da Web do Airflow.

Console

  1. Acesse a página "Ambientes" no Console do Cloud.
    Abrir a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o bucket no Cloud Storage, clique no nome do bucket.

    Por padrão, a pasta dags é aberta.

  4. Clique na caixa de seleção ao lado do DAG que você quer excluir.

  5. Na parte superior do Console do Cloud, clique em Excluir.

  6. Na caixa de diálogo exibida, clique em OK.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME.py é o DAG a ser excluído.
  • Airflow 1.9.0: os metadados dos DAGs excluídos permanecem visíveis na interface da Web do Airflow.

  • Airflow 1.10.0 ou posterior: é possível usar a ferramenta gcloud para remover os metadados do DAG.

Como remover um DAG da interface da Web do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow, insira:

  gcloud composer environments run --location LOCATION \
  ENVIRONMENT_NAME delete_dag -- DAG_NAME

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir