Processing Media Using Cloud Pub/Sub and Compute Engine

This tutorial demonstrates how to use Google Cloud Pub/Sub as a queuing system for orchestrating potentially long-running tasks. In this tutorial, the processing task is to analyze uploaded audio files to create a transcript of spoken words, and then save the transcripts to Google BigQuery for later analysis.

The following diagram provides an overview of the system:

Overview of file upload and processing. Steps are described in following text.

In this system:

  1. Users upload audio files to a Google Cloud Storage bucket.
  2. When a new file is added, Cloud Storage sends an object-changed notification to a watcher application running on Google App Engine.
  3. The watcher application adds a task to push the new media into a queue, managed by a Cloud Pub/Sub subscription.
  4. Worker applications running on Google Compute Engine pull tasks from the queue and then use the Google Speech API to transcribe the audio.
  5. After the audio file is transcribed, the message and confidence are added to a BigQuery table.

For a full description of the architecture used in this tutorial, see Using Cloud Pub/Sub for Long-running Tasks.

Objectives

This tutorial demonstrates how to:

  • Create a BigQuery dataset and table to record data.
  • Create a Cloud Storage bucket.
  • Configure a topic and subscription in Cloud Pub/Sub to manage a task queue.
  • Deploy a watcher app on App Engine that uses object-changed notifications to detect when new media files are added to the input bucket, and than add a task to the queue.
  • Launch worker apps on Compute Engine that pull tasks from the queue and process the media file by using the Speech API.

Costs

This tutorial uses billable components of Google Cloud Platform, including:

  • BigQuery
  • Cloud Storage standard storage
  • Cloud Pub/Sub
  • App Engine
  • Compute Engine

The Pricing Calculator can generate a cost estimate based on your projected usage. The provided link shows the cost estimate for the products used in this tutorial.

Before you begin

  1. Sign in to your Google account.

    If you don't already have one, sign up for a new account.

  2. Set up a Cloud Platform Console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Cloud Pub/Sub API for that project.

    You can view and manage these resources at any time in the Cloud Platform Console.

  3. Install and initialize the Cloud SDK.
    Install the SDK

Cloning the sample code

Use the following command to clone the sample code from GitHub.

git clone https://github.com/GoogleCloudPlatform/pubsub-media-processing/

Creating the BigQuery table

  1. Make sure that the bq tool is installed.
  2. In a terminal window, create a dataset, named media_processing.

    bq mk media_processing
    
  3. Create a table named speech with two columns, named transcript and confidence:

    bq mk --schema transcript:string,confidence:float \
      -t media_processing.speech

Creating the Cloud Storage bucket

This tutorial requires a Cloud Storage bucket which receives uploaded files.

When you name your bucket, use a name like [PREFIX]-mediap-dropzone, but replace [PREFIX] with a value of your choosing.

Use the following instructions to create the bucket.

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click Create bucket.
  3. In the Create bucket dialog, specify the following attributes:
  4. Click Create.

Creating the Cloud Pub/Sub topic and subscription

A Cloud Pub/Sub topic and subscription provide programmatic access to the queue that orchestrates the processing of incoming requests. Follow these steps to create the topic and subscription:

  1. In the Google Cloud Platform Console, go to the Pub/Sub page.

    GO TO THE PUB/SUB PAGE

  2. Select Create topic and enter mediap-topic to the end of the text in Name.

  3. Click Create.

  4. In the Topic list, click the arrow to the right of the new topic.

  5. Select New subscription.

  6. In Subscription name, add mediap-sub to the end of the text in the text field.
  7. Set Delivery Type to Pull.
  8. Click More options.
  9. Set Acknowledgement Deadline to 60 seconds.

  10. Click Create.

Setting up the notification

Every time a new media file is uploaded to the [PREFIX]-mediap-dropzone bucket, a new task is added to the queue in Cloud Pub/Sub. This setup requires the following steps:

  1. Create a service account in the Cloud Platform project that contains the topic and bucket.
  2. Configure the gsutil command-line tool to use the service account.
  3. Set up the notification.

Creating a service account

  1. Open the Service Accounts page in the Cloud Platform Console.

    GO TO THE SERVICE ACCOUNTS PAGE

  2. Select Create service account.

  3. In Service account name, enter mediap-service-account.
  4. For Role, select Project > Owner.

  5. Make a note of the Service account ID. You will use this value when you configure gsutil. The service account ID follows the form [SERVICE_ACCOUNT_NAME]@[PROJECT_ID].iam.gserviceaccount.com, where [SERVICE_ACCOUNT_NAME] can include additional characters to make the value unique.

  6. Select Furnish a new private key.
  7. For Key Type, select P12.
  8. Click Create.

    The console downloads the private key to your machine. Make a note of the filename and path; you will use it in the next section.

  9. Click Close.

Configuring gsutil to use a service account

You can use the Google Cloud SDK to add the service account as a credentialed account. After you add the credentials, all subsequent gsutil commands will use the service account credentials.

  1. Update the Cloud SDK.

    gcloud components update
    
  2. Add the service account.

    gcloud auth activate-service-account [SERVICE_ACCOUNT_ID] --key-file [PATH_TO_KEY]
    
    • [SERVICE_ACCOUNT_ID] is the ID of the service account you created.

    • [PATH_TO_KEY] is the full path and filename to the P12 key you downloaded.

  3. Confirm that the service account is the active credentialed account.

    gcloud auth list
    

You should see the text (active) next to the service account credentials.

Configure the notification

This can be done directly using the gsutil tool:

    gsutil notification create -t projects/[PROJECT_ID]/topics/mediap-topic -f json gs://[PREFIX]-mediap-dropzone

Creating workers to process the media files

The next step is to create and configure the workers that pull tasks from the Cloud Pub/Sub queue and process the media files.

Creating an instance template

Create an instance template that uses a startup script that installs the required components.

The startup script performs the following actions:

  1. Creates working folders locally.
  2. Downloads required libraries.
  3. Installs a virtual environment to run the worker script.
  4. Downloads and runs a Python scripts worker.py pulls messages from the queue and processes the associated media file.

  5. Open the Instance templates page in the Cloud Platform Console.

    OPEN THE INSTANCE TEMPLATES PAGE

  6. Select Create instance template.

  7. In Name, replace the default value with mediap-tpl.

  8. Leave the values of Machine type, and Boot disk at the default values. In a production-grade, media-processing application you might want to increase the amount of CPU.

  9. Under Identity and API Access, select Allow full access to all Cloud APIs. This is necessary to use the Speech API.

  10. Select Management, disks, networking, SSH keys.

  11. Under Automation, copy the script below into the Startup script text box.

      #! /bin/bash
      mkdir /tmp/mediap
      cd /tmp/mediap
      wget -q \
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/requirements.txt
      wget -q \
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/mediator.py
      wget -q \
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/worker.py
      wget -q \
      https://raw.githubusercontent.com/GoogleCloudPlatform/pubsub-media-processing/master/recurror.py
      curl https://bootstrap.pypa.io/get-pip.py | sudo python
      sudo pip install virtualenv
      virtualenv venv
      source venv/bin/activate
      venv/bin/pip install -r requirements.txt
      python worker.py --subscription mediap-sub --dataset_id=media_processing --table_id=speech
      

Creating an instance group

Now that you’ve created an instance template, you can create the instance group.

  1. Open the Instance groups page in the Cloud Platform Console.

    OPEN THE INSTANCE GROUPS PAGE

  2. Select Create instance group.

  3. Set Name to mediap-igr.
  4. Select Instance template and choose the mediap-tpl template.
  5. Turn off autoscaling.
  6. Set Number of instances to 3.

  7. Select Create.

Adding autoscaling to workers (optional)

Adding an autoscaler ensures that there are enough workers to process the queued tasks.

Create an autoscaler by running the following command.

gcloud alpha \
compute instance-groups managed set-autoscaling \
mediap-igr --zone us-central1-f \
--min-num-replicas 0 --max-num-replicas 10 \
--queue-scaling-cloud-pub-sub \
 topic=mediap-topic,subscription=mediap-sub \
--queue-scaling-acceptable-backlog-per-instance 5 \
--target-cpu-utilization 1

Two parameters, --queue-scaling-acceptable-backlog-per-instance and --target-cpu-utilization, are important to ensure that autoscaling does not terminate instances that are actively processing a media file. These parameters specify the size of the Cloud Pub/Sub queue and the average CPU of each VM, respectively.

In this tutorial, media processing is quite fast, so the maximum queue backlog of 5 messages should be sufficient. If you extend this solution to use more- involved processing, such as editing video, consider a lower value for queue-scaling-acceptable-backlog-per-instance.

Testing the system

To test the media processing system, you can use some provided audio files that meet the Speech API requirements. To launch the demo, you can copy the files from a Google Cloud public bucket to your dropzone bucket, as follows. Replace [YOUR_BUCKET_ID] with the correct value:

gsutil -m cp -r gs://solutions-public-assets/media-processor/audio_samples/* gs://[YOUR_BUCKET_ID]

This is what happens behind the scenes:

  1. gsutil copies files from the public bucket to your dropzone bucket.
  2. Your bucket is watched, using an object notification calling an App Engine webhook.
  3. The webhook creates a Cloud Pub/Sub message for each copied file.
  4. One worker picks a message, transcribes the file, and then saves the text to BigQuery.
  5. The process repeats as long as there are messages in the Cloud Pub/Sub queue.

After the audio file is processed, you can see an extra line in your BigQuery table that looks similar to the following example:

BigQuery table shows transcription and confidence value.

Exploring the code

The following sections highlight details from the sample code accompanying this tutorial.

Message format

Whether you use object change notifications or Google Cloud Functions, the content of the published message should be quite similar and contains fields such as the bucket name, media link, media name, and media mime type.

The message format used in this solution uses the attributes sent by the notification feature. The fields are shown in the following snippet.

data = message.data
msg_string = base64.b64decode(data)
msg_data = json.loads(msg_string)
content_type = msg_data["contentType"]

attributes = message.attributes
event_type = attributes['eventType']
bucket_id = attributes['bucketId']
object_id = attributes['objectId']
generation = attributes['objectGeneration']

Watching for new media files

The queue workflow is as follows:

  1. Publish. The Cloud Pub/Sub Notifications for Google Cloud Storage publishes a message to the Cloud Pub/Sub topic.

  2. Pull. All workers compete to pull the next message. The fastest wins.

    The following code snippet, from a worker application running on Compute Engine, pulls down the next message in the queue.

    resp = subscriptions.pull("maxMessages": toprocess)

  3. Acknowledge the deadline. Because you don’t know exactly how long it will take to process the media file, it can be challenging to know how long to set the acknowledgement deadline.

    • If you set the time limit too high, such as 3 hours, any time a worker fails the message might wait as long as 3 hours before being picked up again.

    • If you set the time limit too low, for example, 10 seconds, the message might be deleted while the media is still being processed. If that happens and the worker fails, the message is lost for good.

    To ensure that messages are processed promptly and are not lost, you can have workers periodically refresh their acknowledgement deadline during media processing.

    The following code snippet, from the worker application, extends the acknowledgement deadline.

    #Increment the ackDeadLine to make sure that file has time to be processed
    pubsub_client.projects().subscriptions().modifyAckDeadline(
        subscription=sub,
        body={
            'ackIds': ack_ids,
            'ackDeadlineSeconds': refresh
        }
    ).execute()

  4. Acknowledge the message. After the media is processed, the worker acknowledges the message so that Cloud Pub/Sub removes the message from the queue:

    # Delete the message in the queue by acknowledging it.
    subscription.acknowledge([ack_id])

The following diagram shows what happens when the processing is successful.

Successful processing.

The following diagram shows what happens in case of worker failure.

Processing failed.

Processing the media file

The following code is used by the workers to process the media files.

Calling the Speech API

The Google Speech API analyzes audio input and then returns one or several transcripts with confidence values for the results.

speech_body={
    'config': {
        'encoding': 'FLAC',
        'sampleRate': 16000,
        'languageCode': self.filename.split('_')[0]
    },
    'audio': {
        'uri': "gs://{0}/{1}".format(self.dropzone_bucket, self.filename)
    }
}

Appending the response to the BigQuery table

The following snippet shows how to add the response to the table in BigQuery.

def write_to_bq(self, transcript, confidence):
    """Write to BigQuery"""
    Logger.log_writer("Writing - {} - to BigQuery".format(transcript))
    body = {
        "rows":[{
            "json": {
                "transcript": transcript,
                "confidence": confidence
            }
        }]
    }

    response = self.bq_client.tabledata().insertAll(
        projectId=self.project_id,
        datasetId=self.dataset_id,
        tableId=self.table_id,
        body=body
    ).execute()

Cleaning up

After you've finished the media processing tutorial, you can clean up the resources you created on Google Cloud Platform so you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the Cloud Platform Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Deleting instances

To delete a Compute Engine instance:

  1. In the Cloud Platform Console, go to the VM Instances page.

    Go to the VM Instances page

  2. Click the checkbox next to the instance you want to delete.
  3. Click the Delete button at the top of the page to delete the instance.

Stopping the default App Engine app

The only way you can delete the default version of your App Engine app is by deleting your project. However, you can stop the default version in the Cloud Platform Console. This action shuts down all instances associated with the version. You can restart these instances later if needed.

In the App Engine standard environment, you can stop the default version only if your app has manual or basic scaling.

Deleting Cloud Storage buckets

To delete a Cloud Storage bucket:

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

Deleting datasets in BigQuery

To delete a dataset in BigQuery:

  1. Go to the BigQuery web UI.

    BigQuery web UI

  2. In the navigation, hover on the name of the dataset you created.

  3. Click the down arrow icon down arrow image next to your dataset name in the navigation, and then click Delete dataset.

  4. In the Delete dataset dialog, confirm the delete command by typing the name of your dataset and clicking OK.

What's next

You can extend and build on this sample:

  • Add monitoring. In a production environment, you might want to monitor the size of the Cloud Pub/Sub queue, the autoscaling of workers, and other metrics. For more information, see Cloud Monitoring API for PubSub, Cloud Monitoring Custom Metrics, and Cloud Autoscaler.

  • Add auto-healing. Health checks can be applied to both load balancer and instance groups. When a load balancer detects a failed health check, it redirects traffic to a healthy instance. If an instance group manager detects a failed health check, the manager terminates and restarts the instance.

  • Back up original media files. You might want to retain the original media files for archive purposes, or for later additional processing. You can use the lifecycle management feature of Cloud Storage to move media files from standard to Nearline storage to reduce storage costs.

  • Enrich the audio processing. You can add extra processing to the audio file using NLP libraries, such as Google Natural Language API.

  • Process other media types. Although this sample processes audio, you can also use this architecture to process other types of media, such as images and video.

  • Add multi-threading. Calling the Google Cloud Speech API should be fast enough to do in a single-threaded worker application. For heavier processing workloads, you can use a multi-threaded application to improve performance.

  • Use Google Cloud Functions. You can replace the watcher application running on App Engine, and use Cloud Functions instead. Cloud Functions simplify the deployment of logic that can be triggered upon various events, in this case, a change in a specific bucket. With Cloud Functions, you can use Node.js to publish messages to the Cloud Pub/Sub topic. Code demonstrating this approach is in the GitHub repo associated with this solution. Note that Cloud Functions is currently available in closed Alpha.

  • Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...