Acessar a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

O Apache Airflow tem uma interface da API REST que pode ser usada para executar tarefas como ver informações sobre execuções e tarefas do DAG, atualizar DAGs, configurar o Airflow, adicionar e excluir conexões e listar usuários.

Para um exemplo de como usar a API REST do Airflow com funções do Cloud Run, consulte Como acionar DAGs com o Cloud Functions.

Versões da API REST do Airflow

As seguintes versões da API REST do Airflow estão disponíveis no Cloud Composer 1:

  • O Airflow 2 usa a API REST estável. A API REST experimental foi interrompida pelo Airflow.

  • Ainda será possível usar a API REST experimental no Airflow 2 se você ativá-la por meio de uma substituição da configuração do Airflow, como descrito em mais detalhes.

Antes de começar

Enable the Cloud Composer API.

Enable the API

Ativar a API REST estável do Airflow

Airflow 2

A API REST estável já está ativada por padrão no Airflow 2.

O Cloud Composer usa o próprio back-end de autenticação da API, que é integrado ao Identity-Aware Proxy.

A autorização funciona da maneira padrão fornecida pelo Airflow. Quando um novo usuário autoriza por meio da API, a conta do usuário recebe o papel Op por padrão.

É possível ativar ou desativar a API REST estável ou alterar o papel de usuário padrão substituindo as seguintes opções de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e anteriores) auth_backend
(Airflow 2.3.0 e posterior) auth_backends
airflow.composer.api.backend.composer_auth Para desativar a API REST estável, mude para airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op É possível especificar qualquer outro papel.

Airflow 1

A API REST estável não está disponível no Airflow 1. Em vez disso, use a API REST experimental.

Ativar a API REST experimental do Airflow

Airflow 2

Por padrão, o recurso de autenticação da API está desativado na API experimental. O servidor da Web do Airflow nega todas as solicitações feitas.

Para ativar o recurso de autenticação da API e a API experimental do Airflow 2, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api (Airflow 2.2.5 e versões anteriores) auth_backend
(Airflow 2.3.0 e versões mais recentes) auth_backends
airflow.api.auth.backend.default O padrão é airflow.composer.api.backend.composer_auth.
api enable_experimental_api True O padrão é False.

Airflow 1

Por padrão, o recurso de autenticação da API está desativado no Airflow 1.10.11 e versões posteriores. O servidor da Web do Airflow nega todas as solicitações feitas. Como você usa as solicitações para acionar DAGs, ative esse recurso.

Para ativar o recurso de autenticação da API no Airflow 1, substitua a seguinte opção de configuração do Airflow:

Seção Chave Valor Observações
api auth_backend airflow.api.auth.backend.default O padrão é airflow.api.auth.backend.deny_all

Depois de definir essa opção de configuração como airflow.api.auth.backend.default, o servidor da Web do Airflow aceita todas as sem autenticação. Mesmo que o próprio servidor da Web do Airflow não exija autenticação, ele ainda está protegido pelo Identity-Aware Proxy, que fornece a própria camada de autenticação.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

Dependendo do método usado para chamar a API REST do Airflow, o método de autor da chamada pode usar um endereço IPv4 ou IPv6. Lembre-se de desbloquear Tráfego IP para a API REST do Airflow usando o controle de acesso do servidor da Web.

Use a opção de configuração padrão, que é All IP addresses have access (default), se você não tiver certeza de quais endereços IP suas chamadas para a API REST do Airflow serão enviadas.

Fazer chamadas para a API REST do Airflow

Acessar o client_id do proxy do IAM

Para fazer uma solicitação ao endpoint da API REST do Airflow, a função exige o ID do cliente do proxy do IAM que protege o servidor da Web do Airflow.

O Cloud Composer não fornece essas informações diretamente. Em vez disso, faça uma solicitação não autenticada no servidor da Web do Airflow e capture o ID do cliente do URL de redirecionamento:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Substitua AIRFLOW_URL pelo URL da interface da Web do Airflow.

Na saída, procure a string depois de client_id. Exemplo:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Salve o código a seguir em um arquivo chamado get_client_id.py. Preencha os valores de project_id, location e composer_environment. Em seguida, execute o código no Cloud Shell ou no ambiente local.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Chamar a API REST do Airflow usando o client_id

Faça as seguintes substituições:

  • Substitua o valor da variável client_id pelo valor client_id recebido na etapa anterior.
  • Substitua o valor da variável webserver_id pelo ID do projeto de locatário, que faz parte do URL da interface da Web do Airflow antes de .appspot.com. Você recebeu o URL da interface da Web do Airflow em uma etapa anterior.
  • Especifique a versão da API REST do Airflow usada:

    • Se você usa a API REST estável do Airflow, defina a variável USE_EXPERIMENTAL_API como False.
    • Se você usa a API REST experimental do Airflow, não é necessário fazer alterações. A variável USE_EXPERIMENTAL_API já está definida como True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

Acessar a API REST do Airflow usando uma conta de serviço

O banco de dados do Airflow limita o comprimento do campo de e-mail a 64 caracteres. sometimess vezes, as contas de serviço têm endereços de e-mail com mais de 64 caracteres. No momento, não é possível criar usuários do Airflow para essas contas de serviço. Se não houver um usuário do Airflow para essa conta de serviço, o acesso à API REST do Airflow resultará em erros HTTP 401 e 403.

Como solução alternativa, você pode fazer o pré-registro de um usuário do Airflow para uma conta de serviço. Para fazer isso, use accounts.google.com:NUMERIC_USER_ID como o nome de usuário e qualquer string exclusiva como o e-mail.

  1. Para acessar NUMERIC_USER_ID em uma conta de serviço, execute:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Substitua:

    • SA_NAME pelo nome da conta de serviço.
    • PROJECT_ID pelo ID do projeto.
  2. Crie um usuário do Airflow com o papel Op para a conta de serviço:

    IU do Airflow

    1. Acesse a IU do Airflow.

    2. Acesse Administrador > Usuários e clique em Criar. O usuário do Airflow precisa ter o papel Admin para abrir esta página.

    3. Especifique accounts.google.com:NUMERIC_USER_ID como o nome do usuário. Substitua NUMERIC_USER_ID pelo ID do usuário da etapa anterior.

    4. Especifique um identificador exclusivo como o e-mail. Você pode usar qualquer string exclusiva.

    5. Especifique o papel do usuário. Por exemplo, Op

    6. Verifique se a caixa de seleção Is Active? está marcada.

    7. Especifique o nome e o sobrenome do usuário. Você pode usar qualquer string.

    8. Clique em Save.

    gcloud

    No Airflow 2, execute o comando da CLI do Airflow a seguir:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Substitua:

    • ENVIRONMENT_NAME pelo nome do ambiente
    • LOCATION pela região em que o ambiente está localizado.
    • NUMERIC_USER_ID pelo ID do usuário da etapa anterior.
    • UNIQUE_ID pelo identificador do usuário do Airflow. Você pode usar qualquer string exclusiva.
  3. Depois de criar um usuário do Airflow para uma conta de serviço, um autor da chamada autenticado como a conta de serviço é reconhecido como um usuário pré-registrado e conectado ao Airflow.

Como dimensionar o componente da API REST do Airflow

A API REST e os endpoints da interface do Airflow são executados no componente, ou seja, no servidor da Web do Airflow. Caso você use intensamente a API REST, Aumento dos parâmetros de CPU e memória para ajustar os recursos do servidor da Web do Airflow para a carga esperada.

A seguir