Managing Airflow connections

This page describes how to use Airflow connections.

Airflow connections enable you to access resources in Google Cloud Platform projects from a Cloud Composer environment. You create Airflow connection IDs to store information, such as logins and hostnames, and your workflows reference the connection IDs. Airflow connections are the recommended way to store secrets and credentials used in workflows.

Airflow connections enable you to store the connection information that is required for a Cloud Composer environment to communicate with other APIs, such as Google Cloud Platform projects, other cloud providers, or third-party services.

An Airflow connection can store details, for example credentials, hostnames or additional API parameters. Each connection has an associated ID that you can use in workflow tasks to reference the preset details. We recommend that you use Airflow connections to store secrets and credentials for workflow tasks.

The Google Cloud Platform connection type enables GCP Integrations. For more information about this connection type, see Managing Connection Types.

Fernet key and secured connections

When you create a new environment, Cloud Composer generates a unique, permanent fernet key for the environment and secures connection extras by default. You can view the fernet_key in the Airflow Configuration. For information about how connections are secured, see Securing Connections.

Using the default connections

By default, Cloud Composer configures the following Airflow connections for Google Cloud Platform:

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

You can use these connections from your DAGs by using the default connection ID. The following example uses the BigQueryOperator with the default connection.

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    bql='SELECT 1', use_legacy_sql=False)

You can also specifying the connection ID explicitly when you create the operator.

task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

Accessing resources in another project

The recommended way to allow your Cloud Composer environment to access resources in GCP projects is by using the default connections and by assigning the appropriate Cloud Identity and Access Management permissions to the service account associated with your environment.

The following sections provide examples for how to allow reads and writes to Cloud Storage buckets in your-storage-project for a Cloud Composer environment deployed in the project ID your-composer-project.

Determining the service account associated with your environment

Console

  1. In the GCP Console, open the Environments page.

    Open the Environments page

  2. In the Name column, click the name of the environment to open its Environment details page.
  3. Note the Service account. This value is an email address, such as service-account-name@your-composer-project.iam.gserviceaccount.com.

gcloud

Enter the following command and replace the VARIABLES with appropriate values:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.nodeConfig.serviceAccount)" 

The output shows an address, such as service-account-name@your-composer-project.iam.gserviceaccount.com.

Granting the appropriate IAM permissions to the service account

To allow reads and writes to Cloud Storage buckets in your-storage-project, grant the roles/storage.objectAdmin role to the service account associated with your Cloud Composer environment.

Console

  1. In the IAM & Admin page for your storage project.

    Open the IAM & Admin page

  2. Click Add members.

  3. In the Add members dialog, specify the full email address of the service account associated with your Cloud Composer environment.

  4. In the Select a role drop down, select the appropriate permissions. For this example, select the Storage > Object Admin role.

  5. Click Add.

gcloud

Use the gcloud projects add-iam-policy-binding command to add project-level IAM permissions. Replace the VARIABLES with appropriate values:

gcloud projects add-iam-policy-binding YOUR_STORAGE_PROJECT \
    --member=serviceAccount:SERVICE_ACCOUNT_EMAIL \
    --role=roles/storage.objectAdmin 

After the appropriate permissions are granted, you can access resources in the your-storage-project project with the same default Airflow connections that you use to access resources in the your-composer-project project.

Creating new Airflow connections

Before you begin

Grant the appropriate Cloud IAM permissions to the service account associated with your Cloud Composer environment and use the default connections in your DAG definitions. Follow the steps in this section if you are unable to do so.

Creating a connection to another project

The following steps provide examples for how to allow reads and writes to Cloud Storage buckets in your-storage-project for a Cloud Composer environment deployed in the project ID your-composer-project.

  1. Create a service account in your-storage-project and download a JSON key:

    1. In the GCP Console, open the Service Accounts page.

      Open the Service Accounts page

    2. Click Select a project.

    3. Select your project and click Open.

    4. Click Create Service Account.

    5. Enter a service account name, select a role you want to grant to the service account, such as Storage > Object Admin.

    6. Check Furnish a new private key and click Save.

    7. Open the JSON file in a plain text editor. The contents should look like the following:

      { "type": "service_account", "project_id": "your-storage-project", ... }

  2. Create a new connection:

    Airflow UI

    1. Access the Airflow web interface for your Cloud Composer environment.

    2. In the Airflow web interface, open the Admin > Connections page.

      Airflow
screenshot. Open the Admin Connections menu.

    3. To open the new connection form, click the Create tab.

      Airflow
screenshot. Click the Create tab.

    4. Create a new connection:

      1. To choose a connection ID, fill out the Conn Id field, such as my_gcp_connection. Use this ID in your DAG definition files.
      2. In the Conn Type field, select the Google Cloud Platform option.
      3. Enter a value for the Project Id that corresponds to the project that your service account belongs to.
      4. Do one of the following:

        1. Copy the service account JSON key file that you downloaded into the data/ directory of your environment's Cloud Storage bucket. Then, in Keyfile Path, enter the local file path on the Airflow worker to the JSON keyfile's location, such as /home/airflow/gcs/data/keyfile.json.
        2. In Keyfile JSON, copy the contents of the service account JSON key file that you downloaded.

        Users with access to Airflow connections through the CLI or Web UI can read credentials stored in keyfile_dict. To secure these credentials, we recommend that you use Keyfile Path and use a Cloud Storage ACL to restrict access to the key file.

      5. Enter a value in the Scopes field. It is recommended to use https://www.googleapis.com/auth/cloud-platform as the scope and to use Cloud IAM permissions on the service account to limit access to GCP resources.

      6. To create the connection, click Save.

        Airflow
screenshot. Click the Create tab.

    gcloud

    Enter the following command:

    gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION connections -- --add \
    --conn_id=CONNECTION_ID --conn_type=google_cloud_platform \
    --conn_extra '{"extra\__google\_cloud\_platform\__CMD_ARGS": "...",
    "extra\__google\_cloud\_platform\__CMD_ARGS": "...", ...}'
    

    where:

    • ENVIRONMENT_NAME is the name of the environment.
    • LOCATION is the Compute Engine region where the environment is located.
    • CONNECTION_ID is identifier for the connection. Use lower-case characters, and separate words with underscores.
    • CMD_ARGS are the following:
      • project is a project ID. Only extra__google_cloud_platform__project is required.
      • key_path is a local file path on the Airflow worker to a JSON keyfile, such as /home/airflow/gcs/data/keyfile.json. If provided, also requires scope. Use key_path or keyfile_dict, not both.
      • keyfile_dict is a JSON object that specifies the contents of the JSON keyfile that you downloaded. If provided, also requires scope. Use keyfile_dict or key_path, not both. Users with access to Airflow connections through the CLI or Web UI can read credentials stored in keyfile_dict. To secure these credentials, we recommend that you use key_path and apply a Cloud Storage ACL to restrict access to the key file.
      • scope is a comma-separated list of OAuth scopes.

    For example:

    gcloud composer environments run test-environment \
     --location us-central1 connections -- --add \
     --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
     --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project", \
     "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json", \
     "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'

Using a new Airflow connection

To use the connection you created, set it as the corresponding connection ID argument when you construct a GCP Airflow operator.

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')

Configuring a connection to an external database

Cloud Composer provides the default Cloud SQL proxy in your environment to remotely authorize access to the environment's Cloud SQL database from an application, client, or other GCP service.

To connect Cloud Composer to an external database, such as a SQL database or private IP Cloud SQL instance, you must deploy a new SQL proxy pod (yaml) into the your environment's GKE cluster.

After you deploy the new SQL proxy service, the connection to your external database will originate from your environment's GKE cluster. For the Airflow web server to access the external database—in the form of an Airflow connection, for example—the SQL proxy service should be accessible from the web server. To do so, the following options are available:

  • Create a LoadBalancer service to expose this additional SQL proxy service. You might also want to restrict access to this LoadBalancer service. Alternatively, you can deploy a self-managed Airflow webserver.
  • Create a private IP Cloud Composer environment and expose the SQL proxy service by using ClusterIP service. When private IP mode is enabled in Cloud Composer, the web server has direct access to Kubernetes services.
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer