Data Analytics

How to transfer BigQuery tables between locations with Cloud Composer

BigQuery is a fast, highly scalable, cost-effective, and fully-managed enterprise data warehouse for analytics at any scale. As BigQuery has grown in popularity, one question that often arises is how to copy tables across locations in an efficient and scalable manner. BigQuery has some limitations for data management, one being that the destination dataset must reside in the same location as the source dataset that contains the table being copied. For example, you cannot copy a table from an US-based dataset and write it to a EU-based dataset. Luckily you can use Cloud Composer to implement a data pipeline to transfer a list of tables in an efficient and scalable way.

Cloud Composer is a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the open source Apache Airflow and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use. Plus, it’s deeply integrated within Google Cloud Platform, giving users the ability to orchestrate their full pipeline. Cloud Composer has robust, built-in integration with many products, including Google BigQuery, Cloud Dataflow, Cloud Dataproc, Cloud Datastore, Cloud Storage, Cloud Pub/Sub, and Cloud ML Engine.

Overview

In this blog you will learn how to create and run an Apache Airflow workflow in Cloud Composer that completes the following tasks:

  1. Reads from a configuration file the list of tables to copy

  2. Exports the list of tables from a BigQuery dataset located in a US region to Cloud Storage

  3. Copies the exported tables from US to EU Cloud Storage buckets

  4. Imports the list of tables into the target BigQuery Dataset in an EU region

bigtable_across_locations.png

In this post you will learn:

  1. How to access your Cloud Composer environment through the Google Cloud Platform Console, Cloud SDK, and Airflow web interface.

  2. How to install a Plugin to implement a Cloud Storage to Cloud Storage operator as this operator is required but not present in Composer current Airflow version (1.9)

  3. How to use Cloud Stackdriver Logging Client python library for logging

  4. How to dynamically generate DAGs (directed acyclic graphs) based on a config file

Create Cloud Composer environment

Create a Cloud Composer environment and wait until the environment creation step completes. Enable the Composer API if asked to do so, then click Create. Then set the following parameters for your environment:

Name: composer-advanced-lab

Location: us-central1

Zone: us-central1-a

Leave all other settings as default.

The environment creation process is completed when the green checkmark displays to the left of the environment name on the Environments page in the GCP Console.

It can take up to 20 minutes for the environment to complete the setup process. Move on to the next sections to create your Cloud Storage buckets and a new BigQuery dataset.

Create Cloud Storage buckets

Create two Cloud Storage multi-regional buckets in your project, one located in the US as source and the other in EU as destination. These buckets will be used to copy the exported tables across locations, i.e., US to EU.

Note: One can select regional buckets from the same location to minimize cost and unnecessary replication, for simplicity this post uses multi-regional buckets to keep referring to locations (US and EU).

Go to Navigation menu > Storage > Browser and then click Create bucket.

Give your two buckets a universally unique name including the location as a suffix (e.g., 6552634-us, 6552634-eu).

BigQuery destination dataset

Create the destination BigQuery Dataset in EU from the BigQuery new web UI:

Navigation menu > Big Data > Big Query

Then select your project ID.

Big_Query.png

Finally click Create Data Set, use the name “nyc_tlc_EU” and Data location EU.

Create_Data_Set.png
nyc_tlc_EU.png

Confirm the dataset has been created and is empty.

Defining the workflow

Cloud Composer workflows are comprised of DAGs (Directed Acyclic Graphs). The code shown in bq_copy_across_locations.py is the workflow code, also referred to as the DAG. You can find sample code here.

To orchestrate all the workflow tasks, the DAG imports the following operators:

  1. DummyOperator: Creates Start and End dummy tasks for better visual representation of the DAG.

  2. BigQueryToCloudStorageOperator: Exports BigQuery tables to Cloud Storage buckets using the Avro format.

  3. GoogleCloudStorageToGoogleCloudStorageOperator: Copies files across Cloud Storage buckets.

  4. GoogleCloudStorageToBigQueryOperator: Imports tables from Avro files in Cloud Storage bucket.

We first define the function read_table_list() to read the config file and build the list of tables to copy .
  # --------------------------------------------------------------------------------
# Functions
# --------------------------------------------------------------------------------


def read_table_list(table_list_file):
    """
    Reads the table list file that will help in creating Airflow tasks in
    the DAG dynamically.
    :param table_list_file: (String) The file location of the table list file,
    e.g. '/home/airflow/gcs/dags/table_list.csv'
    :return table_list: (List) List of tuples containing the source and
    target tables.
    """
    table_list = []
    logger.info('Reading table_list_file from : %s' % str(table_list_file))
    try:
        with io.open(table_list_file, 'rt', encoding='utf-8') as csv_file:
            csv_reader = csv.reader(csv_file)
            next(csv_reader)  # skip the headers
            for row in csv_reader:
                logger.info(row)
                table_tuple = {
                    'table_source': row[0],
                    'table_dest': row[1]
                }
                table_list.append(table_tuple)
            return table_list
    except IOError as e:
        logger.error('Error opening table_list_file %s: ' % str(
            table_list_file), e)

You can find the above code on GitHub.

The name of the DAG is bq_copy_us_to_eu_01, and the DAG is not scheduled by default so needs to be triggered manually.

composer/workflows/bq_copy_across_locations.py

VIEW ON GITHUB

  default_args = {
    'owner': 'airflow',
    'start_date': datetime.today(),
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
# DAG object.
with models.DAG('bq_copy_us_to_eu_01',
                default_args=default_args,
                schedule_interval=None) as dag:

To define the Cloud Storage plugin we define a class GCSPlugin(AirflowPLugin), mapping the hook and operator downloaded from the Airflow 1.10-stable branch.

third_party/apache-airflow/plugins/__init__.py

VIEW ON GITHUB

  """
    GCS Plugin
    This plugin provides an interface to GCS operator from Airflow Master.
"""

from airflow.plugins_manager import AirflowPlugin

from gcs_plugin.hooks.gcs_hook import GoogleCloudStorageHook
from gcs_plugin.operators.gcs_to_gcs import \
    GoogleCloudStorageToGoogleCloudStorageOperator


class GCSPlugin(AirflowPlugin):
    name = "gcs_plugin"
    operators = [GoogleCloudStorageToGoogleCloudStorageOperator]
    hooks = [GoogleCloudStorageHook]

Viewing environment information

Go back to Composer to check on the status of your environment.

Once your environment has been created, click the name of the environment to see its details.

The Environment details page provides information, such as the Airflow web UI URL, Kubernetes Engine cluster ID, name of the Cloud Storage bucket connected to the /dags folder.


Environment details.png

Note: Cloud Composer uses Cloud Storage to store Apache Airflow DAGs, also known as workflows. Each environment has an associated Cloud Storage bucket. Cloud Composer schedules only the DAGs in the Cloud Storage bucket.

Setting your DAGs’ Cloud Storage bucket

Set a variable in your Cloud shell to refer to the DAGs Cloud Storage bucket, we will be using this variable few times during the post, e.g., using the above DAGs folder:

  DAGS_BUCKET=us-central1-composer-advanc-46bb6186-bucket


Airflow variables are an Airflow-specific concept that is distinct from environment variables. In this step, you'll set the following three Airflow variables used by the DAG we will deploy: table_list_file_path, gcs_source_bucket, and gcs_dest_bucket.

airflow value

Set Airflow variables using gcloud commands in Cloud Shell, alternatively these can be set using the Airflow UI. To set the three variables, run the gcloud composer command once for each row from the above table. This gcloud composer command executes the Airflow CLI sub-command variables. The sub-command passes the arguments to the gcloud command line tool.

  gcloud composer environments run ENVIRONMENT_NAME \
     --location LOCATION variables -- \
     --set KEY VALUE

For example, the gcs_source_bucket variable would be set like this:

  gcloud composer environments run composer-advanced-lab \
    --location us-central1 variables -- \
--set gcs_source_bucket 6552634-us
  • ENVIRONMENT_NAME is the name of the environment.

  • LOCATION is the Compute Engine region where the environment is located. The gcloud composer command requires including the --location flag or setting the default location before running the gcloud command.

  • KEYand VALUE specify the variable and its value to set. Include a space two dashes space ( -- ) between the left-side gcloud command with gcloud-related arguments and the right-side Airflow sub-command-related arguments. Also include a space between the KEY and VALUE arguments. using the gcloud composer environments run command with the variables sub-command in

To see the value of a variable, run the Airflow CLI sub-command variables with the get argument or use the Airflow UI.

For example, run the following:

  gcloud composer environments run composer-advanced-lab \
    --location us-central1 variables -- --get gcs_source_bucket

Uploading the DAG and dependencies to Cloud Storage

To upload the DAG and its dependencies:

  1. Clone the GCP Python docs samples repository on your Cloud shell.

  cd ~
git clone https://github.com/GoogleCloudPlatform/python-docs-samples

2. Upload a copy of the third party hook and operator to the plugins folder of your Composer DAGs Cloud Storage bucket, e.g.:

  gsutil cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins

3. Next upload the DAG and config file to the DAGs Cloud Storage bucket of your environment, e.g.:

  gsutil cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags
gsutil cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

Cloud Composer registers the DAG in your Airflow environment automatically, DAG changes occur within 3-5 minutes. You can see task status in the Airflow web interface and confirm the DAG is not scheduled as per the settings.

Using the Airflow UI

To access the Airflow web interface using the GCP Console:

  1. Go back to the Environments page.

  2. In the Airflow webserver column for the environment, click the new window icon. The Airflow web UI opens in a new browser window.

  3. Use your GCP credentials.

  4. You’ll now be in the Airflow UI.

Airflow webserver.png

For information about the Airflow UI, see Accessing the web interface

Viewing variables

The variables you set earlier persist in your environment. You can view the variables by selecting Admin > Variables from the Airflow menu bar.

Exploring DAG runs

When you upload your DAG file to the dags folder in Cloud Storage, Cloud Composer parses the file. If no errors are found, the name of the workflow appears in the DAG listing, and the workflow is queued to run immediately if the schedule conditions are met, in our case indicates None as seen above in the settings.
exploring_DAG.png

Trigger the DAG run manually

To trigger the DAG manually click the play button:

trigger_DAG.png

Click bq_copy_us_to_eu_01 to open the DAG details page. This page includes a graphical representation of workflow tasks and dependencies.

DAG_details.png

To run the workflow again from the Graph View:

  1. In the Airflow UI Graph View, click the start graphic.

Click Clear to reset all the tasks and then click OK to confirm.
DAG_graph_view.png

Finally, check the status and results of the bq_copy_us_to_eu_01 workflow by going to the following Console pages:

  • BigQuery web UI to validate the tables are accessible from the dataset created in EU.

BQ_Web_UI.png

You’ve have successfully copied two tables programmatically from the US region to the EU region!

Feel free to reuse this DAG to copy as many tables as you require across your locations.

Next steps

  • You can learn more about using Airflow at the Airflow website or the Airflow Github project. There are lots of other resources available for Airflow, including a discussion group.

  • Sign up for the Apache dev and commits mailing lists (send emails to dev-subscribe@airflow.incubator.apache.org and commits-subscribe@airflow.incubator.apache.org  to subscribe to each)

  • Sign up for an Apache JIRA account and re-open any issues that you care about in the Apache Airflow JIRA project