Access the Airflow REST API

Cloud Composer 1 | Cloud Composer 2

Apache Airflow has a REST API interface that you can use to perform tasks such as getting information about DAG runs and tasks, updating DAGs, getting Airflow configuration, adding and deleting connections, and listing users.

For an example of using Airflow REST API with Cloud Functions, see Triggering DAGs with Cloud Functions.

Airflow REST API versions

The following Airflow REST API versions are available in Cloud Composer 1:

  • Airflow 2 uses the stable REST API. The experimental REST API is deprecated by Airflow.

  • You can still use the experimental REST API in Airflow 2 if you enable it through an Airflow configuration override, as described further.

Before you begin

Enable the Cloud Composer API.

Enable the API

Enable the stable Airflow REST API

Airflow 2

The stable REST API is already enabled by default in Airflow 2.

Cloud Composer uses its own API authentication backend, which is integrated with Identity-Aware Proxy.

Authorization works in the standard way provided by Airflow. When a new user authorizes through the API, the user's account gets the Op role by default.

You can enable or disable the stable REST API, or change the default user role by overriding the following Airflow configuration options:

Section Key Value Notes
api auth_backend airflow.composer.api.backend.composer_auth To disable the stable REST API, change to airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op You can specify any other role.

Airflow 1

The stable REST API is not available in Airflow 1. You can use the experimental REST API instead.

Enable the experimental Airflow REST API

Airflow 2

By default, the API authentication feature is disabled in the experimental API. The Airflow web server denies all requests that you make.

To enable the API authentication feature and the Airflow 2 experimental API, override the following Airflow configuration option:

Section Key Value Notes
api auth_backend airflow.api.auth.backend.default The default is airflow.composer.api.backend.composer_auth.
api enable_experimental_api True The default is False

Airflow 1

By default, the API authentication feature is disabled in Airflow 1.10.11 and later versions. The Airflow web server denies all requests that you make. You use requests to trigger DAGs, so enable this feature.

To enable the API authentication feature in Airflow 1, override the following Airflow configuration option:

Section Key Value Notes
api auth_backend airflow.api.auth.backend.default The default is airflow.api.auth.backend.deny_all

After you set the api-auth_backend configuration option to airflow.api.auth.backend.default, the Airflow web server accepts all API requests without authentication. Even though the Airflow web server itself does not require authentication, it is still protected by Identity-Aware Proxy which provides its own authentication layer.

Make calls to Airflow REST API

Make the following replacements:

  • Replace the value of the client_id variable with the client_id value obtained on a previous step.
  • Replace the value of the webserver_id variable with your tenant project ID, which is a part of the Airflow web interface URL before .appspot.com. You obtained the Airflow web interface URL on a previous step.
  • Specify the Airflow REST API version that you use:

    • If you use the stable Airflow REST API, set the USE_EXPERIMENTAL_API variable to False.
    • If you use the experimental Airflow REST API, no changes are needed. The USE_EXPERIMENTAL_API variable is already set to True.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'

    if USE_EXPERIMENTAL_API:
        endpoint = f'api/experimental/dags/{dag_name}/dag_runs'
        json_data = {'conf': data, 'replace_microseconds': 'false'}
    else:
        endpoint = f'api/v1/dags/{dag_name}/dagRuns'
        json_data = {'conf': data}
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/'
        + endpoint
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

What's next