Aceda à API REST do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

O Apache Airflow tem uma interface de API REST que pode usar para realizar tarefas como obter informações sobre execuções e tarefas de DAGs, atualizar DAGs, obter a configuração do Airflow, adicionar e eliminar associações e listar utilizadores.

Para ver um exemplo de utilização da API REST do Airflow com funções do Cloud Run, consulte o artigo Acionar DAGs com funções do Cloud Run.

Versões da API REST do Airflow

  • O Airflow 2 usa a API REST estável.
  • A API REST experimental foi descontinuada pelo Airflow.

Configure a API REST do Airflow estável

A API REST estável está ativada por predefinição no Airflow 2. O Cloud Composer usa o seu próprio backend de autenticação de APIs.

A autorização funciona da forma padrão fornecida pelo Airflow. Quando um novo utilizador autoriza através da API, a conta do utilizador recebe a função Op por predefinição.

Pode ativar ou desativar a API REST estável ou alterar a função do utilizador predefinida substituindo as seguintes opções de configuração do Airflow:

Secção Chave Valor Notas
api auth_backends airflow.composer.api.backend.composer_auth Para desativar a API REST estável, altere para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op Pode especificar qualquer outra função.

Permitir chamadas à API REST do Airflow através do controlo de acesso do servidor Web

Consoante o método usado para chamar a API REST do Airflow, o método de chamada pode usar um endereço IPv4 ou IPv6. Lembre-se de desbloquear o tráfego de IP para a API REST do Airflow através do controlo de acesso do servidor Web.

Use a opção de configuração predefinida, que é All IP addresses have access (default), se não souber a partir de que endereços IP as suas chamadas para a API REST do Airflow vão ser enviadas.

Faça chamadas para a API REST do Airflow

Esta secção fornece um exemplo de script Python que pode usar para acionar DAGs com a API REST estável do Airflow.

Coloque o conteúdo do exemplo seguinte num ficheiro com o nome composer2_airflow_rest_api.py e, em seguida, defina as seguintes variáveis:

  • dag_id: nome de um DAG, conforme definido no ficheiro de origem do DAG.
  • dag_config: configuração da execução do DAG.
  • web_server_url: o URL do servidor Web do Airflow. O formato é https://<web-server-id>.composer.googleusercontent.com.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET">;, **kwargs: Any
) - google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout">] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) - str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Aceda à API REST do Airflow através de uma conta de serviço

A base de dados do Airflow nas versões do Airflow anteriores à 2.3.0 limita o comprimento do campo de email a 64 carateres. Por vezes, as contas de serviço têm endereços de email com mais de 64 carateres. Não é possível criar utilizadores do Airflow para essas contas de serviço da forma habitual. Se não existir um utilizador do Airflow para essa conta de serviço, o acesso à API REST do Airflow resulta em erros HTTP 401 e 403.

Como solução alternativa, pode pré-registar um utilizador do Airflow para uma conta de serviço. Para isso, use accounts.google.com:NUMERIC_USER_ID como nome de utilizador e qualquer string única como email.

  1. Para obter o NUMERIC_USER_ID para uma conta de serviço, execute o seguinte comando:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substituir:

    • SA_NAME com o nome da conta de serviço.
    • PROJECT_ID com o ID do projeto.
  2. Crie um utilizador do Airflow com a função Op para a conta de serviço:

    IU do Airflow

    1. Aceda à IU do Airflow.

    2. Aceda a Segurança > Listar utilizadores e clique em Adicionar novo registo. O utilizador do Airflow tem de ter a função Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como nome de utilizador. Substitua NUMERIC_USER_ID pelo ID de utilizador obtido no passo anterior.

    4. Especifique um identificador exclusivo como o email. Pode usar qualquer string única.

    5. Especifique a função do utilizador. Por exemplo, Op.

    6. Certifique-se de que a caixa de verificação Está ativo? está selecionada.

    7. Especifique o nome próprio e o apelido do utilizador. Pode usar qualquer string.

    8. Clique em Guardar.

    gcloud

    Execute o seguinte comando da CLI do Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substituir:

    • ENVIRONMENT_NAME com o nome do ambiente.
    • LOCATION com a região onde o ambiente está localizado.
    • NUMERIC_USER_ID com o ID do utilizador obtido no passo anterior.
    • UNIQUE_ID com o identificador do utilizador do Airflow. Pode usar qualquer string única.
  3. Depois de criar um utilizador do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um utilizador pré-registado e tem sessão iniciada no Airflow.

Dimensionar o componente da API REST do Airflow

A API REST Airflow e os pontos finais da IU do Airflow são executados no servidor Web do Airflow. Se usar a API REST de forma intensiva, pondere aumentar a quantidade de CPU e memória disponíveis para o servidor Web do Airflow, com base na carga esperada.

O que se segue?