Using Cloud Pub/Sub with PHP

Many apps do background processing outside the context of a web request. This Bookshelf sample app sends tasks for a separate worker service to run. The worker gathers information from the Google Books API and updates the book information in the database.

This section of the sample app tutorial demonstrates how to set up separate services in App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multipage tutorial. To start from the beginning and read the setup instructions, go to PHP Bookshelf app.

Configuring settings

  1. Copy your settings.yml file from the Logging app events part of this tutorial to the getting-started-php/6-pubsub directory.

  2. Add the following to the bottom of settings.yml:

    pubsub_topic_name: book-created-or-updated
    pubsub_subscription_name: fill-book-details
    

If you use Cloud SQL to store your data, follow these steps to update the worker.yaml file before deploying:

  1. Open worker.yaml for editing.

  2. Uncomment the beta_settings and cloud_sql_instances lines.

  3. Set the value of cloud_sql_instances to the value you used for cloudsql_connection_name in config/settings.yml. It should be in the format [YOUR_PROJECT_NAME]:[YOUR_REGION]:[YOUR_INSTANCE].

  4. Save and close worker.yaml.

Installing dependencies

In the 6-pubsub directory, enter this command:

composer install

Running the app on your local machine

  1. Start a local web server:

    php -S localhost:8000 -t web
    
  2. In another terminal window, start the Cloud Pub/Sub worker:

    php bin/pubsub/entrypoint.php
    
  3. In your browser, enter this address:

    http://localhost:8000

Deploying the app to the App Engine flexible environment

  1. In your terminal window, deploy the sample app:

    gcloud app deploy
    
  2. In your browser, enter this address. Replace [YOUR_PROJECT_ID] with your Google Cloud Platform (GCP) project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    
  3. In your terminal window, deploy the worker service:

    gcloud app deploy worker.yaml
    

    You can check to see if the worker service is running:

    https://worker-dot-[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used when you first deployed the app. The new deployment creates a version of your app and promotes it to the default version.

The older versions of your app remain, as do their associated virtual machine (VM) instances. Be aware that all of these app versions and VM instances are billable resources.

You can reduce costs by deleting the non-default versions of your app.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. See Cleaning up for more detail.

App structure

This diagram shows the app's components and how they fit together.

Cloud Pub/Sub sample structure

Understanding the code

This section walks you through the app's code and explains how it works.

Queueing tasks

The app instantiates a PubSubClient object by using the Cloud Client Libraries for PHP, an idiomatic PHP client for interacting with Google Cloud Platform (GCP). The library is included with Composer as google/cloud. The code uses your project ID to create the PubSubClient object:

use Google\Cloud\PubSub\PubSubClient;
$app['pubsub.client'] = function ($app) {
    // create the pubsub client
    $projectId = $app['config']['google_project_id'];
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    return $pubsub;
};

The code uses the PubSubClient to retrieve a Cloud Pub/Sub topic named fill-book-details. The topic name is specified in the config/settings.yml file. If the topic doesn't exist, it's created:

$app['pubsub.topic'] = function ($app) {
    // create the topic if it does not exist.
    /** @var Google\Cloud\PubSub\PubSubClient **/
    $pubsub = $app['pubsub.client'];
    $topicName = $app['config']['pubsub_topic_name'];
    $topic = $pubsub->topic($topicName);
    if (!$topic->exists()) {
        $topic->create();
    }
    return $topic;
};

The controller enqueues a task whenever a book is created or updated by publishing the book's ID to the Cloud Pub/Sub topic:

if ($id = $model->create($book)) {
    /** @var Google\Cloud\PubSub\PubSub\Topic $topic */
    $topic = $app['pubsub.topic'];
    $topic->publish([
        'data' => 'Updated Book',
        'attributes' => [
            'id' => $id
        ]
    ]);
}

After the Cloud Pub/Sub topic is published, a worker service can subscribe to it. Workers run on separate instances and perform tasks separately to avoid adding latency to the main app. In this tutorial, the worker looks up additional information about books. If a book is missing information, the worker uses the provided ID to pull up existing book data, call the Google Books API, and update the book.

Cloud Pub/Sub job backend

On the backend, a long-running PHP process performs an asynchronous hanging GET with the Cloud Pub/Sub API, so new messages are received immediately. Using the socket library Ratchet, the process listens for the App Engine health checker while simultaneously waiting for new messages:

use Google\Cloud\Samples\Bookshelf\PubSub\HealthCheckListener;
use Ratchet\Http\HttpServer;
use Ratchet\Server\IoServer;
// Listen to port 8080 for our health checker
$server = IoServer::factory(
    new HttpServer(new HealthCheckListener($logger)),
    8080
);

The HealthCheckListener class implements Ratchet\Http\HttpServerInterface and listens on port 8080. Browsing to this service displays "Pubsub worker is running!" in the browser. If the process unexpectedly fails, the health checker receives an unhealthy response, and a new instance of the worker service is created:

public function onOpen(ConnectionInterface $conn, RequestInterface $request = null)
{
    // send the 200 OK health response and return
    $response = new Response(200, [], 'Pubsub worker is running!');
    $conn->send((string) $response);
    $conn->close();
}

The Worker class communicates with the Cloud Pub/Sub API and is added to the Ratchet server with a periodic timer to ensure it's called continuously:

use Google\Cloud\Samples\Bookshelf\PubSub\LookupBookDetailsJob;
use Google\Cloud\Samples\Bookshelf\PubSub\Worker;
// create the job and worker
$job = new LookupBookDetailsJob($app['bookshelf.model'], $app['google_client']);
$worker = new Worker($app['pubsub.subscription'], $job, $logger);
// add our worker to the event loop
$server->loop->addPeriodicTimer(0, $worker);

The Worker class defines a callback for when the Cloud Pub/Sub API returns a response. The callback is necessary because the request being made to the Cloud Pub/Sub API is asynchronous:

$callback = function ($response) use ($job, $subscription, $logger) {
    $ackMessages = [];
    $messages = json_decode($response->getBody(), true);
    if (isset($messages['receivedMessages'])) {
        foreach ($messages['receivedMessages'] as $message) {
            $pubSubMessage = new Message($message['message'], array('ackId' => $message['ackId']));
            $attributes = $pubSubMessage->attributes();
            $logger->info(sprintf('Message received for book ID "%s" ', $attributes['id']));
            // Do the actual work in the LookupBookDetailsJob class
            $job->work($attributes['id']);
            $ackMessages[] = $pubSubMessage;
        }
    }
    // Acknowledge the messsages have been handled
    if (!empty($ackMessages)) {
        $subscription->acknowledgeBatch($ackMessages);
    }
};

Promises are then used to make the request and execute the callback:

$promise = $this->connection->pull([
    'maxMessages' => 1000,
    'subscription' => $this->subscription->info()['name'],
]);
$promise->then($callback);

The callback receives all new messages and passes the book ID to the Worker class.

Google Books API

To process books added to a queue, a Cloud Pub/Sub subscription listens for messages published to the book-created-or-updated topic. The sample app uses the API Client Library for PHP to look up book details from the Google Books API.

When a job runs, the LookupBookDetailsJob class retrieves a list of books, based on a book title, from the Google Books API:

$service = new Google_Service_Books($this->client);
$options = ['orderBy' => 'relevance'];
$results = $service->volumes->listVolumes($book['title'], $options);

If any result from the API includes a book cover image, then it's selected as the best match. The book image is updated and saved in the database:

foreach ($results as $result) {
    $volumeInfo = $result->getVolumeInfo();
    if ($volumeInfo === null) {
        return false;
    }
    $imageInfo = $volumeInfo->getImageLinks();
    if ($imageInfo === null) {
        return false;
    }
    if ($thumbnail = $imageInfo->getThumbnail()) {
        $book['image_url'] = $thumbnail;
        $this->client->getLogger()->info(sprintf(
            'Updating book "%s" with thumbnail "%s"',
            $id, basename($thumbnail)));
        return $this->model->update($book);
    }
}

The Supervisor configuration file

The App Engine flexible environment uses Supervisor to manage processes. By default, Supervisor runs NGINX and PHP-FPM to run PHP web apps. However, some apps don't need NGINX or PHP-FPM. In such cases, you can replace the main supervisord configuration file with the supervisord_conf_override configuration option as shown in this code:

runtime_config:
  document_root: .
  supervisord_conf_override: config/worker-supervisord.conf

worker-supervisord.conf has a section for running the Cloud Pub/Sub worker as shown here:

[supervisord]
nodaemon = true
logfile = /dev/null
logfile_maxbytes = 0
pidfile = /var/run/supervisord.pid

[program:pubsub-worker]
command = php /app/bin/pubsub/entrypoint.php
stdout_logfile = /dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile = /dev/stderr
stderr_logfile_maxbytes=0
user = www-data
autostart = true
autorestart = true
priority = 5
stopwaitsecs = 20

With these files present, the runtime runs only the worker instead of NGINX and PHP-FPM.

This program listens directly on port 8080, so it can respond to the health checker while it waits for new tasks.

Locally, you can run the worker:

php bin/pubsub/entrypoint.php

When the worker runs, it listens for messages on the fill-book-details Cloud Pub/Sub subscription to the book-created-or-updated topic. When a message is received, the associated book is retrieved from the database and LookupBookDetailsJob runs immediately to update the book:

<?php
/*
 * Copyright 2015 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

require_once __DIR__ . '/../../vendor/autoload.php';

/** @var Silex\Application $app */
$app = require __DIR__ . '/../../src/app.php';

/** @var Ratchet\Server\IoServer $server */
$server = $app['pubsub.server'];
$server->run();

Running on GCP

The worker is deployed as a separate module in the same app. App Engine apps can have multiple, independent services. This design lets you independently deploy, configure, scale, and update pieces of your app. The frontend is deployed to the default module, and the worker is deployed to the worker module.

The following diagram contrasts the single-module deployment on the left with the multi-module deployment on the right:

Cloud Pub/Sub deployment

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the Versions page for App Engine.

    Go to the Versions page

  2. Select the checkbox for the non-default app version you want to delete.
  3. Click Delete to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the GCP Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click Delete to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. Click Delete to delete the bucket.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...