Acessar a API REST do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com o Cloud Run functions, consulte Como acionar DAGs com o Cloud Run functions.

Versões da API REST do Airflow

  • O Airflow 2 usa a API REST estável.
  • A API REST experimental foi interrompida pelo Airflow.

Configurar a API REST estável do Airflow

A API REST estável está ativada por padrão no Airflow 2. O Cloud Composer usa o próprio back-end de autenticação da API.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api auth_backends airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

Dependendo do método usado para chamar a API REST do Airflow, o método de autor da chamada pode usar um endereço IPv4 ou IPv6. Desbloqueie o tráfego de IP para a API REST do Airflow usando o controle de acesso ao servidor da Web.

Use a opção de configuração padrão, que é All IP addresses have access (default), se você não tiver certeza de qual endereço IP as chamadas para a API REST do Airflow serão enviadas.

Fazer chamadas para a API REST do Airflow

Nesta seção, fornecemos um exemplo de script Python que pode ser usado para acionar DAGs com a API REST estável do Airflow.

Coloque o conteúdo do exemplo a seguir em um arquivo chamado composer2_airflow_rest_api.py e defina as seguintes variáveis:

  • dag_id: nome de um DAG, conforme definido no arquivo de origem do DAG.
  • dag_config: configuração para a execução do DAG.
  • web_server_url: o URL do servidor da Web do Airflow. O formato é https://<web-server-id>.composer.googleusercontent.com.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Acessar a API REST do Airflow usando uma conta de serviço

O banco de dados do Airflow limita o comprimento do campo de e-mail a 64 caracteres. sometimess vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para essa conta de serviço, o acesso à API REST do Airflow resultará em erros HTTP 401 e 403.

Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer string exclusiva como o e-mail.

  1. Para acessar NUMERIC_USER_ID em uma conta de serviço, execute:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substitua:

    • SA_NAME pelo nome da conta de serviço.
    • PROJECT_ID pelo ID do projeto;
  2. Crie um usuário do Airflow com o papel Op para a conta de serviço:

    IU do Airflow

    1. Acesse a IU do Airflow.

    2. Acesse Security > List Users e clique em Add a new record. O usuário do Airflow precisa ter o papel Admin para abrir essa página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como o nome do usuário. Substitua NUMERIC_USER_ID pelo ID do usuário da etapa anterior.

    4. Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.

    5. Especifique o papel do usuário. Por exemplo, Op

    6. Verifique se a caixa de seleção Is Active? está marcada.

    7. Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.

    8. Clique em Save.

    gcloud

    Execute o seguinte comando da CLI do Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado.
    • NUMERIC_USER_ID pelo ID do usuário da etapa anterior.
    • UNIQUE_ID pelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
  3. Depois de criar um usuário do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um usuário pré-registrado e conectado ao Airflow.

Como dimensionar o componente da API REST do Airflow

A API REST e os endpoints da interface do Airflow são executados no servidor da Web do Airflow. Se você usa a API REST de forma intensiva, considere aumentar a quantidade de CPU e memória disponível para o servidor da Web do Airflow com base na carga esperada.

A seguir