Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

How to aggregate data for BigQuery using Apache Airflow

Tuesday, July 11, 2017

By Tahir Fayyaz, Customer Engineer

Users of Google BigQuery, the cloud-native data warehouse service from Google Cloud Platform (GCP), have access to an ever-expanding range of public datasets for exploration. Some of these datasets are quite large, however, so connecting to them directly from a dashboard created in Google Data Studio or other tools can cause undesirable latency (or costs if you have exceeded your free trial or tier). For that reason, many users prefer to aggregate that data into a smaller table for their dashboard, instead.

Although using cron jobs or DIY scripting are options for that task, using the popular Apache Airflow workflow management framework (currently incubating at Apache) is much more efficient. Airflow, which was invented and open sourced by Airbnb data engineers, lets you programmatically create, schedule and monitor workflows using simple Python syntax. Here are some key reasons why most people will prefer Airflow over manual methods for building and managing data pipelines:

To demonstrate, in this post I’ll explain how to create a pipeline for aggregating daily stats about Github repos by pulling data from the public datasets for Hacker News and Github and pushing them into a new joint table every day. We'll then use the final table to power this GitHub on Hacker News trends dashboard created using Google Data Studio.

This same method can be used for aggregating any of the BigQuery public datasets of your choosing.

Our final pipeline will look like this in the Airflow.

Setting up Airflow

The quickest way to get started and test the pipeline in this post is to set up Airflow locally (make sure you have the gcloud SDK installed, first).

You can install Airflow and the extra packages required for GCP using

$ pip install airflow[gcp_api]

Basics of Airflow

Airflows let you build workflows based on directed acyclic graphs (DAGs), which mathematically model a sequence of multiple tasks. Each task has a specific job to do in your DAG and is controlled through an operator.

There are multiple operators available for GCP (GCP support in Airflow is very good), but in this example, we'll be using only the following ones:

  • BigQuery Check Operator: Runs an SQL query and if 1 or more rows are returned or the row returned is not one of the following (0, null), then the task is successful
  • BigQuery Operator: Runs an SQL query and writes the results in a new table or table partition

We make use of macros to dynamically insert the dates in our queries. To include the date of when the DAG is run, you simply include {{ ds }} or {{ ds_nodash }} if you want the date without any dashes. They are many other ways of using macros that we'll make use of also.

Once you've defined your tasks you can set dependencies so that certain tasks only run once another task has successfully completed. For example, to ensure that task number 3 only runs after two independent tasks have completed, you would define an upstream dependency as:

t3.set_upstream(t1)

t3.set_upstream(t2)

Building the Airflow DAG

First, import required libraries and Airflow operators into a file named bigquery_github_trends_v1.py which should be saved in your dags folder:
from datetime import timedelta, datetime
import json

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
Now let’s define our default DAG arguments. The start date for the DAG, for example, is very important as it allows the DAG to do a backfill from the date you define. (In our example we want to start from June 2, 2017.) You can override these arguments for each task if required:
default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2017, 6, 02),
    'email': [airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5),
}

We then set our schedule interval using cron syntax, which means our DAG will run once a day at 21:00 (9pm). Airflow assumes that your system is using UTC (GMT) and this cannot be changed. Therefore the schedule you set will use UTC.

schedule_interval = "00 21 * * *"

An important note is that the DAG will run at the end of the day after 23:59, effectively the next day. Therefore, as we have set the start date following in our default_args as datetime(2017, 6, 02) the DAG for date 2017-06-02 will run at 21:00 on June 3, 2017 but the macro {{ ds }}, which we'll use in our queries below to set a dynamic date, will still equal 2017-06-02.

We also make use of {{ yesterday_ds }} which will equal 2017-06-01. We're using yesterday because we need to wait a full day for all of the data to be available in BigQuery for the public datasets we're using.

Finally we create our DAG object and define a dag_id and apply the default args and the schedule interval.

dag = DAG('bigquery_github_trends_v1', default_args=default_args, schedule_interval=schedule_interval)

Tasks to check Github and Hackernews tables and data exist

To begin, we'll check that the tables and the data we want to use is available for the day before (yesterday) on the day that the DAG runs.

Task 1 will check that the github archive data has a dated table created for that date

t1 = BigQueryCheckOperator(
    task_id='bq_check_githubarchive_day',
    sql='''
    #legacySql
    SELECT table_id
    FROM [githubarchive:day.__TABLES__]
    WHERE table_id = "{{ yesterday_ds_nodash }}"
    ''',
    dag=dag)
Task 2 will check that the hacker news table contains data for that date.
t2 = BigQueryCheckOperator(
    task_id='bq_check_hackernews_full',
    sql='''
    #legacySql
    SELECT
    STRFTIME_UTC_USEC(timestamp, "%Y%m%d") as date
    FROM
      [bigquery-public-data:hacker_news.full]
    WHERE
    type = 'story'
    AND STRFTIME_UTC_USEC(timestamp, "%Y%m%d") = "{{ yesterday_ds_nodash }}"
    LIMIT 1
    ''',
    dag=dag)
Note: For the BigQuery check operators we must use Legacy SQL (see open issue) but for our main queries we can use Standard SQL.

Test the tasks

Airflow allows you to easily test each task individually using the syntax:
$ airflow test [-h] [-sd SUBDIR] [-dr] [-tp TASK_PARAMS] dag_id task_id execution_date
To test the 2 tasks we created earlier we simply need to run the following 2 commands:
$ airflow test bigquery_github_trends_v1 bq_check_githubarchive_day 2017-06-02

$ airflow test bigquery_github_trends_v1 bq_check_hackernews_full 2017-06-02

Create our new partitioned tables

We'll be writing data into new tables with our tasks below so first we must create all our partitioned tables using the bq cli with the following command:
$ bq mk --time_partitioning_type=DAY my-project:github_trends.github_daily_metrics
$ bq mk --time_partitioning_type=DAY my-project:github_trends.github_agg
$ bq mk --time_partitioning_type=DAY my-project:github_trends.hackernews_agg
$ bq mk --time_partitioning_type=DAY my-project:github_trends.hackernews_github_agg

Task to aggregate github events to daily metric table

As we'll be creating metrics for the past day, past 7 days and past 28 days in a later step we want to avoid having to scan all the raw data for the previous dates. Therefore we'll first create a partitioned table, which holds all the aggregated metrics for the github events we're interested in.

We then use the BigQueryOperator to run our query and write the results to the relevant dated partition.

t3 = BigQueryOperator(
    task_id='bq_write_to_github_daily_metrics',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
      date,
      repo,
      SUM(IF(type='WatchEvent', 1, NULL)) AS stars,
      SUM(IF(type='ForkEvent',  1, NULL)) AS forks
    FROM (
      SELECT
        FORMAT_TIMESTAMP("%Y%m%d", created_at) AS date,
        actor.id as actor_id,
        repo.name as repo,
        type
      FROM
        `githubarchive.day.{{ yesterday_ds_nodash }}`
      WHERE type IN ('WatchEvent','ForkEvent')
    )
    GROUP BY
      date,
      repo
    ''',    destination_dataset_table='airflow-cloud-public-datasets.github_trends.github_daily_metrics${{ yesterday_ds_nodash }}',
    dag=dag)

Run a Backfill for bq_write_to_github_daily_metrics task

Our final table will have a start date of June 1st 2017. However as you'll see in the next Github aggregate query, we need data from the previous 28 days from the github_trends.github_daily_metrics table to create our previous_28_days and previous_7_days metrics. To do this, we'll run a backfill for all of May 2017 just for this specific task. Airflow has the cli command airflow backfill which allows us to do this easily using the syntax.

$ airflow backfill [-h] [-t TASK_REGEX] [-s START_DATE] [-e END_DATE] dag_id

We'll run the cmd as follows, which run the dag and any downstream dags from May 2, 2017 to June 1, 2017. Notice our end date needs to be 2017-06-02.

$ airflow backfill -t bq_write_to_github_daily_metrics -s 2017-05-02 -e 2017-06-02
bigquery_github_trends_v1 

You can then visit the Airflow interface and see the backfill running for the dates you defined.

Tasks to aggregate Github and Hackernews data

Next, we'll aggregate the Github data and Hackernews data and write the data to two separate aggregate tables.

Write to Github aggregate table

t4 = BigQueryOperator(
    task_id='bq_write_to_github_agg',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
      "{{ yesterday_ds_nodash }}" as date,
      repo,
      SUM(stars) as stars_last_28_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}")
        AND TIMESTAMP("{{ yesterday_ds }}") ,
        stars, null)) as stars_last_7_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}")
        AND TIMESTAMP("{{ yesterday_ds }}") ,
        stars, null)) as stars_last_1_day,
      SUM(forks) as forks_last_28_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -6) }}")
        AND TIMESTAMP("{{ yesterday_ds }}") ,
        forks, null)) as forks_last_7_days,
      SUM(IF(_PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}")
        AND TIMESTAMP("{{ yesterday_ds }}") ,
        forks, null)) as forks_last_1_day
    FROM
      `airflow-cloud-public-datasets.github_trends.github_daily_metrics`
    WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ macros.ds_add(ds, -27) }}")
    AND TIMESTAMP("{{ yesterday_ds }}")
    GROUP BY
      date,
      repo
    ''',
    destination_dataset_table='airflow-cloud-public-datasets.github_trends.github_agg${{ yesterday_ds_nodash }}',
    dag=dag)
Write to Hacknernews aggregate table
t5 = BigQueryOperator(
    task_id='bq_write_to_hackernews_agg',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
      FORMAT_TIMESTAMP("%Y%m%d", timestamp) AS date,
      `by` AS submitter,
      id as story_id,
      REGEXP_EXTRACT(url, "(https?://github.com/[^/]*/[^/#?]*)") as url,
      SUM(score) as score
    FROM
      `bigquery-public-data.hacker_news.full`
    WHERE
      type = 'story'
      AND timestamp>'{{ yesterday_ds }}'
      AND timestamp<'{{ ds }}'
      AND url LIKE '%https://github.com%'
      AND url NOT LIKE '%github.com/blog/%'
    GROUP BY
      date,
      submitter,
      story_id,
      url
    ''',
    destination_dataset_table='airflow-cloud-public-datasets.github_trends.hackernews_agg${{ yesterday_ds_nodash }}',
    dag=dag)

Task to join the aggregate tables

In this query, we'll look at the date link that was shared on Github and how many stars and forks it had in the last 1 day, last 7 days and last 28 days. As we have aggregated the previous tables daily, we no longer need to scan the full Github and Hackernews tables.
t6 = BigQueryOperator(
    task_id='bq_write_to_hackernews_github_agg',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql='''
    #standardSQL
    SELECT
    a.date as date,
    a.url as github_url,
    b.repo as github_repo,
    a.score as hn_score,
    a.story_id as hn_story_id,
    b.stars_last_28_days as stars_last_28_days,
    b.stars_last_7_days as stars_last_7_days,
    b.stars_last_1_day as stars_last_1_day,
    b.forks_last_28_days as forks_last_28_days,
    b.forks_last_7_days as forks_last_7_days,
    b.forks_last_1_day as forks_last_1_day
    FROM
    (SELECT
      *
    FROM
      `airflow-cloud-public-datasets.github_trends.hackernews_agg`
      WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") AND TIMESTAMP("{{ yesterday_ds }}")
      )as a
    LEFT JOIN
      (
      SELECT
      repo,
      CONCAT('https://github.com/', repo) as url,
      stars_last_28_days,
      stars_last_7_days,
      stars_last_1_day,
      forks_last_28_days,
      forks_last_7_days,
      forks_last_1_day
      FROM
      `airflow-cloud-public-datasets.github_trends.github_agg`
      WHERE _PARTITIONTIME BETWEEN TIMESTAMP("{{ yesterday_ds }}") AND TIMESTAMP("{{ yesterday_ds }}")
      ) as b
    ON a.url = b.url
    ''',
    destination_dataset_table='airflow-cloud-public-datasets.github_trends.hackernews_github_agg${{ yesterday_ds_nodash }}',
    dag=dag)

Task to check the data was written to final table

Our final query will check that the data for the date of the dag run was written successfully to our hackernews_github_agg table.
t7 = BigQueryCheckOperator(
    task_id='bq_check_hackernews_github_agg',
    sql='''
    #legacySql
    SELECT
    partition_id
    FROM
    [airflow-cloud-public-datasets:github_trends.hackernews_github_agg$__PARTITIONS_SUMMARY__]
    WHERE partition_id = "{{ yesterday_ds_nodash }}"
    ''',
    dag=dag)

Set our dependencies for each task

We can now define when each task should run after another by setting upstream dependencies. These set of rules are what generate our DAG tree shown earlier in the post.

t3.set_upstream(t1)

t4.set_upstream(t3)

t5.set_upstream(t2)

t6.set_upstream(t4)
t6.set_upstream(t5)

t7.set_upstream(t6)

Turn on the DAG

Now that the DAG is complete we can turn it on in the Airflow UI, and it will run from the start date we defined and will continue to run daily to ensure our dashboard is always up to date. You can then monitor the progress to check there are no issues causing any of the tasks to fail.

Dealing with task failures

If any of the tasks fail, they'll be highlighted in red in the DAG graph and tree view. You can also set-up the DAG to send an email and/or a function to be called in the default_args.

You can then choose to deal with the task that failed in various ways in the Airflow UI. By clicking on the task node highlighted in red we are presented with a screen of options.

This error occurred due to the table arriving late in the table we were checking so we can click on clear downstream and the task will run again and will now pass as the data is now in the table.

See reports for your DAG runs

Airflow has a few great reports that allow you to check the performance of your pipeline over time, and these can be used to see where there might be bottlenecks and therefore make improvements.

The following task duration report is useful to see if the duration time for each task is steady. We can see there are some days it takes longer than expected but these are recurring issues in this example.

Our final Airflow DAG

Here’s the final result saved as a file named bigquery_github_trends_v1.py available to re-use on GitHub.

Next steps

You should now have all the info you need to aggregate BigQuery public datasets in interesting ways that feed your dashboard more efficiently.

A lot of inspiration for these queries were taken from the examples shared by Felipe Hoffa for Hacker News and Github Archive.
For more examples DAGs see how others have used Airflow for GCP.

There are a few open source examples for setting up Apache Airflow on GCP once you go into production, including:

  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.

TRY IT FREE

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.