Acessar a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com o Cloud Functions, consulte Como acionar DAGs com o Cloud Functions.

Versões da API REST do Airflow

As seguintes versões da API REST do Airflow estão disponíveis no Cloud Composer 1:

  • O Airflow 2 usa a API REST estável. A API REST experimental foi interrompida pelo Airflow.

  • Ainda será possível usar a API REST experimental no Airflow 2 se você ativá-la por meio de uma substituição da configuração do Airflow, como descrito em mais detalhes.

Antes de começar

Ative a API Cloud Composer.

Ative a API

Ativar a API REST estável do Airflow

Airflow 2

A API REST estável já está ativada por padrão no Airflow 2.

O Cloud Composer usa o próprio back-end de autenticação da API, que é integrado ao Identity-Aware Proxy.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e anteriores) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Airflow 1

A API REST estável não está disponível no Airflow 1. Em vez disso, use a API REST experimental.

Ativar a API REST experimental do Airflow

Airflow 2

Por padrão, o recurso de autenticação da API está desativado na API experimental. O servidor da Web do Airflow nega todas as solicitações feitas.

Para ativar o recurso de autenticação da API e a API experimental do Airflow 2, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e anteriores) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.api.auth.backend.default O padrão é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True O padrão é False.

Airflow 1

Por padrão, o recurso de autenticação da API está desativado no Airflow 1.10.11 e versões posteriores. O servidor da Web do Airflow nega todas as solicitações feitas. Como você usa as solicitações para acionar DAGs, ative esse recurso.

Para ativar o recurso de autenticação da API no Airflow 1, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api auth_backend airflow.api.auth.backend.default O padrão é airflow.api.auth.backend.deny_all

Depois de definir essa opção de configuração como airflow.api.auth.backend.default, o servidor da Web do Airflow aceita todas as sem autenticação. Mesmo que o próprio servidor da Web do Airflow não exija autenticação, ele ainda está protegido pelo Identity-Aware Proxy, que fornece a própria camada de autenticação.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

Dependendo do método usado para chamar a API REST do Airflow, o método do autor da chamada podem estar usando o endereço IPv4 ou IPv6. Lembre-se de desbloquear Tráfego IP para a API REST do Airflow usando o controle de acesso do servidor da Web.

Use a opção de configuração padrão, que é All IP addresses have access (default). se você não souber de quais endereços IP vai chamar a API REST do Airflow será enviado.

Fazer chamadas para a API REST do Airflow

Acessar o client_id do proxy do IAM

Para fazer uma solicitação ao endpoint da API REST do Airflow, a função exige o ID do cliente do proxy do IAM que protege o servidor da Web do Airflow.

O Cloud Composer não fornece essas informações diretamente. Em vez disso, faça uma solicitação não autenticada no servidor da Web do Airflow e capture o ID do cliente do URL de redirecionamento:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Substitua AIRFLOW_URL pelo URL da interface da Web do Airflow.

Na saída, procure a string depois de client_id. Exemplo:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Salve o código a seguir em um arquivo chamado get_client_id.py. Preencha os valores de project_id, location e composer_environment. Em seguida, execute o código no Cloud Shell ou no ambiente local.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Chamar a API REST do Airflow usando client_id

Faça as seguintes substituições:

  • Substitua o valor da variável client_id pelo valor client_id recebido na etapa anterior.
  • Substitua o valor da variável webserver_id pelo ID do projeto de locatário, que faz parte do URL da interface da Web do Airflow antes de .appspot.com. Você recebeu o URL da interface da Web do Airflow em uma etapa anterior.
  • Especifique a versão da API REST do Airflow usada:

    • Se você usa a API REST estável do Airflow, defina a variável USE_EXPERIMENTAL_API como False.
    • Se você usa a API REST experimental do Airflow, não é necessário fazer alterações. A variável USE_EXPERIMENTAL_API já está definida como True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

Acessar a API REST do Airflow usando uma conta de serviço

O banco de dados do Airflow limita o comprimento do campo de e-mail a 64 caracteres. sometimess vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para esse serviço conta, acessar a API REST do Airflow resulta em erros HTTP 401 e 403

Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer string exclusiva como o e-mail.

  1. Para acessar NUMERIC_USER_ID em uma conta de serviço, execute:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substitua:

    • SA_NAME pelo nome da conta de serviço.
    • PROJECT_ID pelo ID do projeto.
  2. Crie um usuário do Airflow com o papel Op para a conta de serviço:

    IU do Airflow

    1. Acesse a IU do Airflow.

    2. Acesse Administrador > Usuários e clique em Criar. Seu O usuário do Airflow precisa ter o papel Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como o nome do usuário. Substitua NUMERIC_USER_ID pelo ID do usuário da etapa anterior.

    4. Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.

    5. Especifique o papel do usuário. Por exemplo, Op

    6. Verifique se a caixa de seleção Está ativo? está marcada.

    7. Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.

    8. Clique em Save.

    gcloud

    No Airflow 2, execute o comando da CLI do Airflow a seguir:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado.
    • NUMERIC_USER_ID pelo ID do usuário da etapa anterior.
    • UNIQUE_ID pelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
  3. Depois de criar um usuário do Airflow para uma conta de serviço, o autor da chamada autenticado porque a conta de serviço é reconhecida como um usuário pré-registrado, e fez login no Airflow.

Como escalonar o componente da API REST do Airflow

Os endpoints da API REST e da interface do Airflow do Airflow são executados no componente. por exemplo, o servidor da Web Airflow. Caso você use intensamente a API REST, Aumento dos parâmetros de CPU e memória para ajustar os recursos do servidor da Web do Airflow para a carga esperada.

A seguir