Transfer data from other services with Google Transfer Operators

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This page demonstrates how to transfer data from other services with Google Transfer Operators in your DAGs.

About Google Transfer Operators

Google Transfer Operators are a set of Airflow operators that you can use to pull data from other services into Google Cloud.

This guide shows operators for Azure FileShare Storage and Amazon S3 that work with Cloud Storage. There are many more transfer operators that work with services within Google Cloud and with services other than Google Cloud.

Amazon S3 to Cloud Storage

This section demonstrates how to synchronize data from Amazon S3 to a Cloud Storage bucket.

Install the Amazon provider package

The apache-airflow-providers-amazon package contains the connection types and functionality that interacts with Amazon S3. Install this PyPI package in your environment.

Configure a connection to Amazon S3

The Amazon provider package provides a connection type for Amazon S3. You create a connection of this type. The connection for Cloud Storage, named google_cloud_default is already set up in your environment.

Set up a connection to Amazon S3 in the following way:

  1. In Airflow UI, go to Admin > Connections.
  2. Create a new connection.
  3. Select Amazon S3 as the connection type.
  4. The following example uses a connection named aws_s3. You can use this name, or any other name for the connection.
  5. Specify connection parameters as described in the Airflow documentation for Amazon Web Services Connection. For example, to set up a connection with AWS access keys, you generate an access key for your account on AWS, then provide the AWS access key ID as a login the AWS secret access key as a password for the connection.

Transfer data from Amazon S3

If you want to operate on the synchronized data later in another DAG or task, pull it to the /data folder of your environment's bucket. This folder is synchronized to other Airflow workers, so that tasks in your DAG can operate on it.

The following example DAG does the following:

  • Synchronizes contents of the /data-for-gcs directory from an S3 bucket to the /data/from-s3/data-for-gcs/ folder in your environment's bucket.
  • Waits for two minutes, for the data to synchronize to all Airflow workers in your environment.
  • Outputs the list of files in this directory using the ls command. Replace this task with other Airflow operators that work with your data.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare to Cloud Storage

This section demonstrates how to synchronize data from Azure FileShare to a Cloud Storage bucket.

Install the Microsoft Azure provider package

The apache-airflow-providers-microsoft-azure package contains the connection types and functionality that interacts with Microsoft Azure. Install this PyPI package in your environment.

Configure a connection to Azure FileShare

The Microsoft Azure provider package provides a connection type for Azure File Share. You create a connection of this type. The connection for Cloud Storage, named google_cloud_default is already set up in your environment.

Set up a connection to Azure FileShare in the following way:

  1. In Airflow UI, go to Admin > Connections.
  2. Create a new connection.
  3. Select Azure FileShare as the connection type.
  4. The following example uses a connection named azure_fileshare. You can use this name, or any other name for the connection.
  5. Specify connection parameters as described in the Airflow documentation for Microsoft Azure File Share Connection. For example, you can specify a connection string for your storage account access key.

Transfer data from Azure FileShare

If you want to operate on the synchronized data later in another DAG or task, pull it to the /data folder of your environment's bucket. This folder is synchronized to other Airflow workers, so that tasks in your DAG can operate on it.

The following DAG does the following:

The following example DAG does the following:

  • Synchronizes contents of the /data-for-gcs directory from Azure File Share to the /data/from-azure folder in your environment's bucket.
  • Waits for two minutes, for the data to synchronize to all Airflow workers in your environment.
  • Outputs the list of files in this directory using the ls command. Replace this task with other Airflow operators that work with your data.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

What's next