Aceda à API REST do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

O Apache Airflow tem uma interface de API REST que pode usar para realizar tarefas como obter informações sobre execuções e tarefas de DAGs, atualizar DAGs, obter a configuração do Airflow, adicionar e eliminar associações e listar utilizadores.

Para ver um exemplo de utilização da API REST do Airflow com funções do Cloud Run, consulte o artigo Acionar DAGs com funções do Cloud Run.

Versões da API REST do Airflow

  • O Airflow 1 usa a API REST experimental.
  • O Airflow 2 usa a API REST estável. A API REST experimental foi descontinuada pelo Airflow.
  • Pode continuar a usar a API REST experimental no Airflow 2 se a ativar através de uma substituição da configuração do Airflow, conforme descrito mais detalhadamente.

Configure a API REST do Airflow estável

Airflow 2

A API REST estável está ativada por predefinição no Airflow 2. O Cloud Composer usa o seu próprio backend de autenticação da API, que está integrado com o Identity-Aware Proxy.

A autorização funciona da forma padrão fornecida pelo Airflow. Quando um novo utilizador autoriza através da API, a conta do utilizador recebe a função Op por predefinição.

Pode ativar ou desativar a API REST estável ou alterar a função do utilizador predefinida substituindo as seguintes opções de configuração do Airflow:

Secção Chave Valor Notas
api (Airflow 2.2.5 e anterior) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.composer.api.backend.composer_auth Para desativar a API REST estável, altere para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op Pode especificar qualquer outra função.

Fluxo de ar 1

A API REST estável não está disponível no Airflow 1. Em alternativa, pode usar a API REST experimental.

Configure a API REST do Airflow experimental

Airflow 2

Por predefinição, a funcionalidade de autenticação da API está desativada na API experimental. O servidor Web do Airflow nega todos os pedidos. Para ativar a funcionalidade de autenticação da API e a API experimental do Airflow 2, substitua a seguinte opção de configuração do Airflow:

Secção Chave Valor Notas
api (Airflow 2.2.5 e anterior) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.api.auth.backend.default A predefinição é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True A predefinição é False.

Fluxo de ar 1

Por predefinição, a funcionalidade de autenticação da API está desativada no Airflow 1.10.11 e versões posteriores. O servidor Web do Airflow nega todos os pedidos que faz. Usa pedidos para acionar DAGs, por isso, ative esta funcionalidade.

Para ativar a funcionalidade de autenticação da API no Airflow 1, substitua a seguinte opção de configuração do Airflow:

Secção Chave Valor Notas
api auth_backend airflow.api.auth.backend.default A predefinição é airflow.api.auth.backend.deny_all

Depois de definir esta opção de configuração como airflow.api.auth.backend.default, o servidor Web do Airflow aceita todos os pedidos da API sem autenticação.

Embora o servidor Web do Airflow em si não exija autenticação, o Cloud Composer usa a sua própria camada de autenticação para o proteger, que está integrada com o Identity-Aware Proxy.

Permitir chamadas à API REST do Airflow através do controlo de acesso do servidor Web

Consoante o método usado para chamar a API REST do Airflow, o método de chamada pode usar um endereço IPv4 ou IPv6. Lembre-se de desbloquear o tráfego de IP para a API REST do Airflow através do controlo de acesso do servidor Web.

Use a opção de configuração predefinida, que é All IP addresses have access (default), se não souber a partir de que endereços IP as suas chamadas para a API REST do Airflow vão ser enviadas.

Faça chamadas para a API REST do Airflow

Obtenha o client_id do proxy IAM

Para fazer um pedido ao ponto final da API REST do Airflow, a função requer o ID de cliente do proxy da IAM que protege o servidor Web do Airflow.

O Cloud Composer não fornece estas informações diretamente. Em alternativa, faça um pedido não autenticado ao servidor Web do Airflow e capture o ID do cliente a partir do URL de redirecionamento:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Substitua AIRFLOW_URL pelo URL da interface Web do Airflow.

Na saída, pesquise a string que se segue a client_id. Por exemplo:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Guarde o seguinte código num ficheiro denominado get_client_id.py. Preencha os valores de project_id, location e composer_environment e, em seguida, execute o código no Cloud Shell ou no seu ambiente local.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Chame a API REST do Airflow usando client_id

Faça as seguintes substituições:

  • Substitua o valor da variável client_id pelo valor client_id obtido no passo anterior.
  • Substitua o valor da variável webserver_id pelo ID do projeto de inquilino, que faz parte do URL da interface Web do Airflow antes de .appspot.com. Obteve o URL da interface Web do Airflow num passo anterior.
  • Especifique a versão da API REST Airflow que usa:

    • Se usar a API REST do Airflow estável, defina a variável USE_EXPERIMENTAL_API como False.
    • Se usar a API REST do Airflow experimental, não tem de fazer nenhuma alteração. A variável USE_EXPERIMENTAL_API já está definida como True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

Aceda à API REST do Airflow através de uma conta de serviço

A base de dados do Airflow nas versões do Airflow anteriores à 2.3.0 limita o comprimento do campo de email a 64 carateres. Por vezes, as contas de serviço têm endereços de email com mais de 64 carateres. Não é possível criar utilizadores do Airflow para essas contas de serviço da forma habitual. Se não existir um utilizador do Airflow para essa conta de serviço, o acesso à API REST do Airflow resulta em erros HTTP 401 e 403.

Como solução alternativa, pode pré-registar um utilizador do Airflow para uma conta de serviço. Para isso, use accounts.google.com:NUMERIC_USER_ID como nome de utilizador e qualquer string única como email.

  1. Para obter o NUMERIC_USER_ID para uma conta de serviço, execute o seguinte comando:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substituir:

    • SA_NAME com o nome da conta de serviço.
    • PROJECT_ID com o ID do projeto.
  2. Crie um utilizador do Airflow com a função Op para a conta de serviço:

    IU do Airflow

    1. Aceda à IU do Airflow.

    2. Aceda a Admin > Utilizadores e clique em Criar. O utilizador do Airflow tem de ter a função Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como nome de utilizador. Substitua NUMERIC_USER_ID pelo ID de utilizador obtido no passo anterior.

    4. Especifique um identificador exclusivo como o email. Pode usar qualquer string única.

    5. Especifique a função do utilizador. Por exemplo, Op.

    6. Certifique-se de que a caixa de verificação Está ativo? está selecionada.

    7. Especifique o nome próprio e o apelido do utilizador. Pode usar qualquer string.

    8. Clique em Guardar.

    gcloud

    No Airflow 2, execute o seguinte comando da CLI do Airflow:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substituir:

    • ENVIRONMENT_NAME com o nome do ambiente.
    • LOCATION com a região onde o ambiente está localizado.
    • NUMERIC_USER_ID com o ID do utilizador obtido no passo anterior.
    • UNIQUE_ID com o identificador do utilizador do Airflow. Pode usar qualquer string única.
  3. Depois de criar um utilizador do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um utilizador pré-registado e tem sessão iniciada no Airflow.

Dimensionar o componente da API REST do Airflow

A API REST Airflow e os pontos finais da IU do Airflow são executados no servidor Web do Airflow. Se usar a API REST de forma intensiva, pondere aumentar a quantidade de CPU e memória disponíveis para o servidor Web do Airflow, com base na carga esperada.

O que se segue?