Data Analytics

Using upstream Apache Airflow Hooks and Operators in Cloud Composer

For engineers or developers in charge of integrating, transforming, and loading a variety of data from an ever-growing collection of sources and systems, Cloud Composer has dramatically reduced the number of cycles spent on workflow logistics. Built on Apache Airflow, Cloud Composer makes it easy to author, schedule, and monitor data pipelines across multiple clouds and on-premises data centers.

Let’s walk through an example of how Cloud Composer makes building a pipeline across public clouds easier. As you design your new workflow that’s going to bring data from another cloud (Microsoft Azure’s ADLS, for example) into Google Cloud, you notice that upstream Apache Airflow already has an ADLS hook that you can use to copy data. You insert an import statement into your DAG file, save, and attempt to test your workflow. “ImportError - no module named x.” Now what?

As it turns out, functionality that has been committed upstream—such as brand new Hooks and Operators—might not have made its way into Cloud Composer just yet. Don’t worry, though: you can still use these upstream additions by leveraging the Apache Airflow Plugin interface.

Using the upstream AzureDataLakeHook as an example, all you have to do is the following:

  1. Copy the code into a separate file (ensuring adherence to the Apache License)

  2. Import the AirflowPlugin module (from airflow.plugins_manager import AirflowPlugin)

  3. Add the below snippet to the bottom of the file:

Language: Python

  class AdlsPlugin(AirflowPlugin):
    name = "adls_plugin"
    operators = []
    # A list of class(es) derived from BaseHook
    hooks = [AzureDataLakeHook]
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

Once you have completed the above steps, you need to ensure that all other dependencies required by the functionality you added are included in your Cloud Composer environment. In this example we need to include the azure-datalake-store package. To install this package into your environment, you can use the Cloud Console. Navigate to Cloud Composer, click on your environment, followed by PyPI Packages, and then click “Edit.” It may take a few moments for the operation to complete, but once it succeeds, you should see a view similar to the screenshot below:

Cloud Composer.png

Next, we need to make the plugin available to the Cloud Composer environment. To do this, you can copy the plugin to the plugins folder following the instructions here. This command will look something like this:

  gcloud composer environments storage plugins import --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source PATH_TO_LOCAL_FILE \
    --destination PATH_IN_SUBFOLDER

Once the plugin has been imported, you can now use it. This simple example snippet shows how to import the plugin and leverage the AzureDataLakeHook functionality that the plugin now provides in conjunction with the GoogleCloudStorageHook to copy data from ADLS to Cloud Storage:

  from airflow.hooks.adls_plugin import AzureDataLakeHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook

 def upload_to_gcs(local_path,bucket,destination):
      gcs = GoogleCloudStorageHook()
      files_and_dirs = os.listdir(local_path)
      files_to_upload = [file for file in files_and_dirs if isfile(join(local_path,file))]
      for file in files_to_upload:
        object = join(destination,file)
        gcs.upload(bucket, object, join(local_path,file), mime_type='application/octet-stream')
        #Then delete the file locally
    # Downloads files from ADLS using ADLS Hook 
    def download_from_adls(local_path,remote_path):
      adls = AzureDataLakeHook()
      #check for file logic goes here
    def download_from_adls_to_gcs(local_path,adls_path,gcs_bucket,gcs_dest):

You could easily extend this to create a more robust Operator that provides this functionality, and use the same workflow to make that available to your specific workflows.

In summary, you can use features from the upstream Apache Airflow codebase, including newer connectors to external data sources, even with Cloud Composer, Google’s managed Airflow service. For more on working with upstream components, check out the Airflow documentation here.