Acessar a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com funções do Cloud Run, consulte Como acionar DAGs com o Cloud Functions.

Versões da API REST do Airflow

As seguintes versões da API REST do Airflow estão disponíveis no Cloud Composer 2:

  • O Airflow 2 usa a API REST estável. A API REST experimental foi interrompida pelo Airflow.

  • Ainda será possível usar a API REST experimental no Airflow 2 se você ativá-la por meio de uma substituição da configuração do Airflow, como descrito em mais detalhes.

Antes de começar

Enable the Cloud Composer API.

Enable the API

Ativar a API REST estável do Airflow

A API REST estável já está ativada por padrão no Airflow 2.

O Cloud Composer usa o próprio back-end de autenticação da API.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e anteriores) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Ativar a API REST experimental do Airflow

Por padrão, o recurso de autenticação da API está desativado na API experimental. O servidor da Web do Airflow nega todas as solicitações feitas.

Para ativar o recurso de autenticação da API e a API experimental do Airflow 2, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e versões anteriores) auth_backend
(Airflow 2.3.0 e versões mais recentes) auth_backends
airflow.api.auth.backend.default O padrão é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True O padrão é False.

Depois de definir essa opção de configuração como airflow.api.auth.backend.default, o servidor da Web do Airflow aceita todas as sem autenticação. Mesmo que o próprio servidor da Web do Airflow não exija autenticação, ele ainda está protegido pelo Identity-Aware Proxy, que fornece a própria camada de autenticação.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

Dependendo do método usado para chamar a API REST do Airflow, o método de autor da chamada pode usar um endereço IPv4 ou IPv6. Lembre-se de desbloquear Tráfego IP para a API REST do Airflow usando o controle de acesso do servidor da Web.

Use a opção de configuração padrão, que é All IP addresses have access (default), se você não tiver certeza de quais endereços IP suas chamadas para a API REST do Airflow serão enviadas.

Fazer chamadas para a API REST do Airflow

Nesta seção, fornecemos um exemplo de script Python que pode ser usado para acionar DAGs com a API REST estável do Airflow.

Coloque o conteúdo do exemplo a seguir em um arquivo chamado composer2_airflow_rest_api.py e forneça o URL da IU do Airflow, o nome do DAG e a configuração de execução do DAG nos valores de variáveis.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Por exemplo, a configuração abaixo não está correta

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

Acessar a API REST do Airflow usando uma conta de serviço

O banco de dados do Airflow limita o comprimento do campo de e-mail a 64 caracteres. sometimess vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para esse serviço conta, acessar a API REST do Airflow resulta em erros HTTP 401 e 403

Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer string exclusiva como o e-mail.

  1. Para acessar NUMERIC_USER_ID em uma conta de serviço, execute:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substitua:

    • SA_NAME pelo nome da conta de serviço.
    • PROJECT_ID pelo ID do projeto.
  2. Crie um usuário do Airflow com o papel Op para a conta de serviço:

    IU do Airflow

    1. Acesse a IU do Airflow.

    2. Acesse Segurança > Listar usuários e clique em Adicionar um novo registro. Seu usuário do Airflow precisa ter o papel Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como o nome do usuário. Substitua NUMERIC_USER_ID pelo ID do usuário da etapa anterior.

    4. Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.

    5. Especifique o papel do usuário. Por exemplo, Op

    6. Verifique se a caixa de seleção Está ativo? está marcada.

    7. Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.

    8. Clique em Save.

    gcloud

    Execute o seguinte comando da CLI do Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado.
    • NUMERIC_USER_ID pelo ID do usuário da etapa anterior.
    • UNIQUE_ID pelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
  3. Depois de criar um usuário do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um usuário pré-registrado e conectado ao Airflow.

Como escalonar o componente da API REST do Airflow

Os endpoints da API REST e da interface do Airflow do Airflow são executados no componente. por exemplo, o servidor da Web Airflow. Se você usa a API REST de forma intensiva, aumente os parâmetros de CPU e memória para ajustar os recursos do servidor da Web do Airflow à carga esperada.

A seguir