Acessar a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com funções do Cloud Run, consulte Como acionar DAGs com o Cloud Functions.

Versões da API REST do Airflow

As seguintes versões da API REST do Airflow estão disponíveis no Cloud Composer 2:

  • O Airflow 2 usa a API REST estável. A API REST experimental foi interrompida pelo Airflow.

  • Ainda será possível usar a API REST experimental no Airflow 2 se você ativá-la por meio de uma substituição da configuração do Airflow, como descrito em mais detalhes.

Antes de começar

Enable the Cloud Composer API.

Enable the API

Ativar a API REST estável do Airflow

A API REST estável já está ativada por padrão no Airflow 2.

O Cloud Composer usa o próprio back-end de autenticação da API.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e versões anteriores) auth_backend
(Airflow 2.3.0 e versões mais recentes) auth_backends
airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Ativar a API REST experimental do Airflow

Por padrão, o recurso de autenticação da API está desativado na API experimental. O servidor da Web do Airflow nega todas as solicitações feitas.

Para ativar o recurso de autenticação da API e a API experimental do Airflow 2, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e versões anteriores) auth_backend
(Airflow 2.3.0 e versões mais recentes) auth_backends
airflow.api.auth.backend.default O padrão é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True O padrão é False.

Depois de definir essa opção de configuração como airflow.api.auth.backend.default, o servidor da Web do Airflow aceita todas as solicitações de API sem autenticação. Mesmo que o próprio servidor da Web do Airflow não exija autenticação, ele ainda está protegido pelo Identity-Aware Proxy, que fornece a própria camada de autenticação.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

Dependendo do método usado para chamar a API REST do Airflow, o método do autor da chamada podem estar usando o endereço IPv4 ou IPv6. Desbloqueie o tráfego de IP para a API REST do Airflow usando o Controle de acesso do servidor da Web.

Use a opção de configuração padrão, que é All IP addresses have access (default), se você não tiver certeza de quais endereços IP suas chamadas para a API REST do Airflow serão enviadas.

Fazer chamadas para a API REST do Airflow

Nesta seção, fornecemos um exemplo de script Python que pode ser usado para acionar DAGs com a API REST estável do Airflow.

Coloque o conteúdo do exemplo a seguir em um arquivo chamado composer2_airflow_rest_api.py e forneça o URL da IU do Airflow, o nome do DAG e a configuração de execução do DAG nos valores de variáveis.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Por exemplo, a configuração abaixo não está correta

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

Acessar a API REST do Airflow usando uma conta de serviço

O banco de dados do Airflow limita o comprimento do campo de e-mail a 64 caracteres. sometimess vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para essa conta de serviço, o acesso à API REST do Airflow resultará em erros HTTP 401 e 403.

Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer string exclusiva como o e-mail.

  1. Para acessar NUMERIC_USER_ID em uma conta de serviço, execute:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substitua:

    • SA_NAME pelo nome da conta de serviço.
    • PROJECT_ID pelo ID do projeto.
  2. Crie um usuário do Airflow com o papel Op para a conta de serviço:

    IU do Airflow

    1. Acesse a IU do Airflow.

    2. Acesse Segurança > Listar usuários e clique em Adicionar um novo registro. Seu usuário do Airflow precisa ter o papel Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como o nome do usuário. Substitua NUMERIC_USER_ID pelo ID do usuário da etapa anterior.

    4. Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.

    5. Especifique o papel do usuário. Por exemplo, Op

    6. Verifique se a caixa de seleção Is Active? está marcada.

    7. Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.

    8. Clique em Save.

    gcloud

    Execute o seguinte comando da CLI do Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado.
    • NUMERIC_USER_ID pelo ID do usuário da etapa anterior.
    • UNIQUE_ID pelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
  3. Depois de criar um usuário do Airflow para uma conta de serviço, o autor da chamada autenticado porque a conta de serviço é reconhecida como um usuário pré-registrado, e fez login no Airflow.

Como escalonar o componente da API REST do Airflow

Os endpoints da API REST e da interface do Airflow do Airflow são executados no componente. por exemplo, o servidor da Web Airflow. Se você usa a API REST de forma intensiva, considere aumentar os parâmetros de CPU e memória para ajustar os recursos do servidor da Web do Airflow à carga esperada.

A seguir