Application Development

GCP developer pro-tips: How to schedule a recurring Python script on GCP

python.png

So, you find yourself executing the same Python script each day. Maybe you’re executing a query in BigQuery and dumping the results in BigTable each morning to run an analysis. Or perhaps you need to update the data in a Pivot Table in Google Sheets to create a really pretty histogram to display your billing data. Regardless, no one likes doing the same thing every day if technology can do it for them. Behold the magic of Cloud Scheduler, Cloud Functions, and PubSub!

Cloud Scheduler is a managed Google Cloud Platform (GCP) product that lets you specify a frequency in order to schedule a recurring job. In a nutshell, it is a lightweight managed task scheduler. This task can be an ad hoc batch job, big data processing job, infrastructure automation tooling—you name it. The nice part is that Cloud Scheduler handles all the heavy lifting for you: It retries in the event of failure and even lets you run something at 4 AM, so that you don’t need to wake up in the middle of the night to run a workload at otherwise off-peak timing. 

When setting up the job, you determine what exactly you will “trigger” at runtime. This can be a PubSub topic, HTTP endpoint, or an App Engine application. In this example, we will publish a message to a PubSub topic.

Our PubSub topic exists purely to connect the two ends of our pipeline: It is an intermediary mechanism for connecting the Cloud Scheduler job and the Cloud Function, which holds the actual Python script that we will run. Essentially, the PubSub topic acts like a telephone line, providing the connection that allows the Cloud Scheduler job to talk, and the Cloud Function to listen. This is because the Cloud Scheduler job publishes a message to the topic. The Cloud Function subscribes to this topic. This means that it is alerted whenever a new message is published. When it is alerted, it then executes the Python script.

The Code

SQL
For this example, I’ll show you a simple Python script that I want to run daily at 8 AM ET and 8 PM ET. The script is basic: it executes a SQL query in BigQuery to find popular Github repositories. We will specifically be looking for which owners created repositories with the most amount of forks and in which year they were created. We will use data from the public dataset bigquery-public-data:sample, which holds data about repositories created between 2007 and 2012. Our SQL query looks like this:

  SELECT
  SUM(repository.forks) AS sum_forks,
  repository.owner,
  EXTRACT(YEAR FROM PARSE_TIMESTAMP('%Y/%m/%d %H:%M:%S %z', repository.created_at)) AS year_created
FROM
  `bigquery-public-data.samples.github_nested`
WHERE
  repository.created_at IS NOT NULL
GROUP BY
  2,
  3
ORDER BY
  3 DESC

Python
We will soon paste this query in our github_query.sql file. This will be called in our main.py file, which calls a main function that executes the query in Python by using the Python Client Library for BigQuery.

Step 1: Ensure that you have Python 3 and install and initialize the Cloud SDK. The following will walk you through how to create the GCP environment. If you wish to test it locally, ensure that you have followed the instructions for setting up Python 3 on GCP first.

Step 2: Create a file called requirements.txt and copy and paste the following:
  google-cloud-bigquery

Step 3: Create a file called github_query.sql and paste in the SQL query from above.

Step 4: Create a file called config.py and edit with your values for the following variables. 

You may use an existing dataset for this or pick an ID of a new dataset that you will create, just remember the id as you will need it for granting permissions later on.
  config_vars = {
    'project_id': [ENTER YOUR PROJECT ID HERE],
    'output_dataset_id': '[ENTER OUTPUT DATASET HERE]',
    'output_table_name': '[ENTER OUTPUT TABLE NAME HERE]',
    'sql_file_path': 'github_query.sql'
}

Step 4: Create a file called main.py which references the previous two files.

  """Function called by PubSub trigger to execute cron job tasks."""
import datetime
import logging
from string import Template
import config
from google.cloud import bigquery

def file_to_string(sql_path):
    """Converts a SQL file holding a SQL query to a string.
    Args:
        sql_path: String containing a file path
    Returns:
        String representation of a file's contents
    """
    with open(sql_path, 'r') as sql_file:
        return sql_file.read()


def execute_query(bq_client):
    """Executes transformation query to a new destination table.
    Args:
        bq_client: Object representing a reference to a BigQuery Client
    """
    dataset_ref = bq_client.get_dataset(bigquery.DatasetReference(
        project=config.config_vars['project_id'],
        dataset_id=config.config_vars['output_dataset_id']))
    table_ref = dataset_ref.table(config.config_vars['output_table_name'])
    job_config = bigquery.QueryJobConfig()
    job_config.destination = table_ref
    job_config.write_disposition = bigquery.WriteDisposition().WRITE_TRUNCATE
    sql = file_to_string(config.config_vars['sql_file_path'])
    logging.info('Attempting query on all dates...')
    # Execute Query
    query_job = bq_client.query(
        sql,
        job_config=job_config)

    query_job.result()  # Waits for the query to finish
    logging.info('Query complete. The table is updated.')

def main(data, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
        data (dict): Event payload.
        context (google.cloud.functions.Context): Metadata for the event.
    """
    bq_client = bigquery.Client()

    try:
        current_time = datetime.datetime.utcnow()
        log_message = Template('Cloud Function was triggered on $time')
        logging.info(log_message.safe_substitute(time=current_time))

        try:
            execute_query(bq_client)

        except Exception as error:
            log_message = Template('Query failed due to '
                                   '$message.')
            logging.error(log_message.safe_substitute(message=error))

    except Exception as error:
        log_message = Template('$error').substitute(error=error)
        logging.error(log_message)

if __name__ == '__main__':
    main('data', 'context')

In order to deploy the function on GCP, you can run the following gcloud commands. This specifies using a runtime of Python 3.7, creating a PubSub topic with a name of your choosing, and specifying that this function is triggered whenever a new message is published to this topic. I have also set the timeout to the maximum that GCP offers of 540 seconds, or nine minutes.

Make sure you first cd into the directory where the files are located before deploying, or else the following will not work.
  gcloud functions deploy [FUNCTION_NAME] --entry-point main --runtime python37 --trigger-resource [TOPIC_NAME] --trigger-event google.pubsub.topic.publish --timeout 540s

You specify the frequency of how often your Cloud Function will run in UNIX cron time when setting up Cloud Scheduler with the schedule flag. This means that it will publish a message to the PubSub topic every 12 hours in the UTC timezone, as seen below:

  gcloud scheduler jobs create pubsub [JOB_NAME] --schedule [SCHEDULE] --topic [TOPIC_NAME] --message-body [MESSAGE_BODY]

where [JOB_NAME] is a unique name for a job, [SCHEDULE] is the frequency for the job in UNIX cron, such as "0 */12 * * *" to run every 12 hours, [TOPIC_NAME] is the name of the topic created in the step above when you deployed the Cloud Function, and [MESSAGE_BODY] is any string. An example command would be:

  gcloud scheduler jobs create pubsub daily_job --schedule "0 */12 * * *" --topic my-pubsub-topic --message-body "This is a job that I run twice per day!"

Our Python code does not use the actual message This is a job that I run twice per day!“” published in the topic because we are just executing a query in BigQuery, but it is worth noting that you could retrieve this message and act on it, such as for logging purposes or otherwise.

Grant permissions
Finally, open up the BigQuery UI and click ‘Create Dataset’ in the project that you referenced above.

By creating the Cloud Function, you created a service account with the email in the format [PROJECT_ID]@appspot.gserviceaccount.com. Copy this email for the next step.

  1. Hover over the plus icon for this new dataset.

  2. Click "Share Dataset".

  3. In the pop-up, enter the service account email. Give it permission "Can Edit".

Run the job:
You can test the workflow above by running the project now, instead of waiting for the scheduled UNIX time. To do this:

  1. Open up the Cloud Scheduler page in the console.

  2. Click the ‘Run Now’ button.

  3. Open up BigQuery in the console.

  4. Under your output dataset, look for your [output_table_name], this will contain the data.

To learn more, read our documentation on setting up Cloud Scheduler with PubSub trigger, and try it out using one of our BigQuery public datasets.