Access the Airflow REST API

Cloud Composer 1 | Cloud Composer 2

Apache Airflow has a REST API interface that you can use to perform tasks such as getting information about DAG runs and tasks, updating DAGs, getting Airflow configuration, adding and deleting connections, and listing users.

For an example of using Airflow REST API with Cloud Functions, see Triggering DAGs with Cloud Functions.

Airflow REST API versions

The following Airflow REST API versions are available in Cloud Composer 2:

  • Airflow 2 uses the stable REST API. The experimental REST API is deprecated by Airflow.

  • You can still use the experimental REST API in Airflow 2 if you enable it through an Airflow configuration override, as described further.

Before you begin

Enable the Cloud Composer API.

Enable the API

Enable the stable Airflow REST API

The stable REST API is already enabled by default in Airflow 2.

Cloud Composer uses its own API authentication backend.

Authorization works in the standard way provided by Airflow. When a new user authorizes through the API, the user's account gets the Op role by default.

You can enable or disable the stable REST API, or change the default user role by overriding the following Airflow configuration options:

Section Key Value Notes
api auth_backend airflow.composer.api.backend.composer_auth To disable the stable REST API, change to airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op You can specify any other role.

Enable the experimental Airflow REST API

By default, the API authentication feature is disabled in the experimental API. The Airflow web server denies all requests that you make.

To enable the API authentication feature and the Airflow 2 experimental API, override the following Airflow configuration option:

Section Key Value Notes
api auth_backend airflow.api.auth.backend.default The default is airflow.composer.api.backend.composer_auth.
api enable_experimental_api True The default is False

After you set the api-auth_backend configuration option to airflow.api.auth.backend.default, the Airflow web server accepts all API requests without authentication. Even though the Airflow web server itself does not require authentication, it is still protected by Identity-Aware Proxy which provides its own authentication layer.

Make calls to Airflow REST API

This section provides an example Python script which you can use to trigger DAGs with the stable Airflow REST API.

Put the contents of the following example into a file named composer2_airflow_rest_api.py, and then provide your Airflow UI URL, the name of the DAG, and the DAG run config in the parameters.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Trigger a DAG in Cloud Composer 2 environment using the Airflow 2 stable REST API."""

import google.auth
from google.auth.transport.requests import AuthorizedSession

import argparse
import json

AUTH_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'

def make_composer2_web_server_request(url, method='GET', **kwargs):
  """
  Make a request to Cloud Composer 2 environment's web server.
  Args:
    url: The URL to fetch.
    method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
      'PATCH', 'DELETE')
    **kwargs: Any of the parameters defined for the request function:
              https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
  """

  credentials, _ = google.auth.default(scopes=[AUTH_SCOPE])
  authed_session = AuthorizedSession(credentials)

  # Set the default timeout, if missing
  if 'timeout' not in kwargs:
    kwargs['timeout'] = 90

  return authed_session.request(
    method,
    url,
    **kwargs)


def trigger_dag(webserver_url, dag_id, data, context=None):

  endpoint = f'api/v1/dags/{dag_id}/dagRuns'
  request_url = f'{webserver_url}/{endpoint}'
  json_data = { 'conf': data }

  response = make_composer2_web_server_request(request_url,
    method='POST',
    json=json_data
    )

  if response.status_code == 403:
    raise Exception('You do not have a permission to access this resource.')
  elif response.status_code != 200:
    raise Exception(
      'Bad request: {!r} / {!r} / {!r}'.format(
        response.status_code, response.headers, response.text))
  else:
    return response.text


# Usage:
# python3 composer2_airflow_rest_api.py dag_id dag_config webserver_url

# Example:
# python3 composer2_airflow_rest_api.py composer_sample_dag \
# '{"test": "value"}' \
# https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com

if __name__ == "__main__":

  parser = argparse.ArgumentParser(
      description=__doc__,
      formatter_class=argparse.RawDescriptionHelpFormatter)

  parser.add_argument('dag_id', help='ID of the DAG to run.')
  parser.add_argument(
      'dag_config',
      help='Config for the DAG run. Example: {"test": "value"}',
      type=str)
  parser.add_argument(
      'webserver_url',
      help='Airflow web server address. To obtain this URL, \
            run the following command for your environment: \
            gcloud composer environments describe example-environment \
            --location=us-central1 \
            --format="value(config.airflowUri)"')

  args = parser.parse_args()
  dag_config_json = json.loads(args.dag_config)

  response_text = trigger_dag(
    webserver_url=args.webserver_url, dag_id=args.dag_id, data=dag_config_json)

  print(response_text)

What's next