Pub/Sub integration tests

Demonstrates how to integration test a function triggered by Pub/Sub.

Documentation pages that include this code sample

To view the code sample used in context, see the following documentation:

Code sample

C++

#include <boost/filesystem.hpp>
#include <boost/process.hpp>
#include <curl/curl.h>
#include <gmock/gmock.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <string>
#include <thread>

namespace {

namespace bp = boost::process;
namespace bfs = boost::filesystem;  // Boost.Process cannot use std::filesystem
using ::testing::HasSubstr;
using ::testing::IsEmpty;

struct HttpResponse {
  long code;  // NOLINT(google-runtime-int)
  std::string payload;
};

HttpResponse HttpEvent(std::string const& url, std::string const& payload);

char const* argv0 = nullptr;

auto ExePath(bfs::path const& filename) {
  static auto const kPath = std::vector<bfs::path>{
      bfs::canonical(argv0).make_preferred().parent_path()};
  return bp::search_path(filename, kPath);
}

class PubsubIntegrationTest : public ::testing::Test {
 protected:
  void SetUp() override {
    curl_global_init(CURL_GLOBAL_ALL);

    auto server = bp::child(ExePath("pubsub_integration_server"), "--port=8040",
                            (bp::std_out & bp::std_err) > child_log_);
    ASSERT_TRUE(WaitForServerReady("http://localhost:8040/"));
    process_ = std::move(server);
  }

  // Wait until an HTTP server starts responding.
  bool WaitForServerReady(std::string const& url);

  void TearDown() override {
    process_.terminate();
    process_.wait();
    curl_global_cleanup();
  }

  std::string NextChildLine() {
    std::string line;
    std::getline(child_log_, line);
    return line;
  }

 private:
  bp::ipstream child_log_;
  bp::child process_;
};

TEST_F(PubsubIntegrationTest, Basic) {
  auto constexpr kOkay = 200;

  auto const base = nlohmann::json::parse(R"js({
    "specversion": "1.0",
    "type": "google.cloud.pubsub.topic.v1.messagePublished",
    "source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
    "id": "aaaaaa-1111-bbbb-2222-cccccccccccc",
    "time": "2020-09-29T11:32:00.000Z",
    "datacontenttype": "application/json",
    "data": {
      "subscription": "projects/sample-project/subscriptions/sample-subscription",
      "message": {
      }
    }
  })js");

  struct TestCases {
    std::string data;
    std::string expected;
  } cases[]{
      // The magic string was obtained using:
      //  /bin/echo -n 'C++' | openssl base64 -e
      {"Qysr", "Hello C++"},
      {"", "Hello World"},
  };

  for (auto const& test : cases) {
    SCOPED_TRACE("Testing for " + test.expected);
    auto object = base;
    object["data"]["message"] = nlohmann::json{{"data", test.data}};
    SCOPED_TRACE("event=" + object.dump());
    auto actual = HttpEvent("http://localhost:8040/", object.dump());
    EXPECT_EQ(actual.code, kOkay);
    EXPECT_THAT(actual.payload, IsEmpty());

    auto line = NextChildLine();
    EXPECT_THAT(line, HasSubstr(test.expected));
  }
}

extern "C" size_t CurlOnWriteData(char* ptr, size_t size, size_t nmemb,
                                  void* userdata) {
  reinterpret_cast<std::string*>(userdata)->append(ptr, size * nmemb);
  return size * nmemb;
}

HttpResponse HttpEvent(std::string const& url, std::string const& payload) {
  using CurlHandle = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>;
  using CurlHeaders =
      std::unique_ptr<curl_slist, decltype(&curl_slist_free_all)>;

  auto easy = CurlHandle(curl_easy_init(), curl_easy_cleanup);

  auto setopt = [h = easy.get()](auto opt, auto value) {
    if (auto e = curl_easy_setopt(h, opt, value); e != CURLE_OK) {
      std::ostringstream os;
      os << "error [" << e << "] setting curl_easy option <" << opt
         << ">=" << value;
      throw std::runtime_error(std::move(os).str());
    }
  };
  auto get_response_code = [h = easy.get()]() {
    long code;  // NOLINT(google-runtime-int)
    auto e = curl_easy_getinfo(h, CURLINFO_RESPONSE_CODE, &code);
    if (e != CURLE_OK) std::runtime_error("Cannot get response code");
    return code;
  };

  auto headers = CurlHeaders(nullptr, curl_slist_free_all);
  auto add_header = [&headers](std::string const& h) {
    auto* nh = curl_slist_append(headers.get(), h.c_str());
    (void)headers.release();
    headers.reset(nh);
  };

  setopt(CURLOPT_URL, url.c_str());
  setopt(CURLOPT_POSTFIELDSIZE, payload.size());
  setopt(CURLOPT_POSTFIELDS, payload.data());
  std::string buffer;
  setopt(CURLOPT_WRITEDATA, &buffer);
  setopt(CURLOPT_WRITEFUNCTION, &CurlOnWriteData);
  add_header("Content-Type: application/cloudevents+json");
  add_header("Content-Length: " + std::to_string(payload.size()));
  setopt(CURLOPT_HTTPHEADER, headers.get());

  auto e = curl_easy_perform(easy.get());
  if (e != CURLE_OK) throw std::runtime_error("Error in curl_easy_perform");
  return HttpResponse{get_response_code(), std::move(buffer)};
}

bool PubsubIntegrationTest::WaitForServerReady(std::string const& url) {
  using namespace std::chrono_literals;
  auto constexpr kOkay = 200;
  for (auto delay : {100ms, 200ms, 400ms, 800ms, 1600ms}) {  // NOLINT
    std::cout << "Waiting for server to start [" << delay.count() << "ms]\n";
    std::this_thread::sleep_for(delay);
    auto constexpr kPing = R"js({
      "specversion": "1.0",
      "type": "test-type",
      "source": "test-source",
      "id": "test-id",
      "datacontenttype": "application/json",
      "data": {
        "message": { "data": "" }
      }
    })js";
    try {
      auto const r = HttpEvent(url, kPing);
      auto line = NextChildLine();
      if (r.code == kOkay) return true;
      std::cerr << "... [" << r.code << "] " << line << std::endl;
    } catch (std::exception const& ex) {
      // The HttpEvent() function may fail with an exception until the server is
      // ready. Log it to ease troubleshooting in the CI builds.
      std::cerr << "WaitForServerReady[" << delay.count()
                << "ms]: server ping failed with " << ex.what() << std::endl;
    }
  }
  return false;
}

}  // namespace

int main(int argc, char* argv[]) {
  ::testing::InitGoogleMock(&argc, argv);
  ::argv0 = argv[0];
  return RUN_ALL_TESTS();
}

Java


import static com.google.common.truth.Truth.assertThat;

import com.google.gson.Gson;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedRunnable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.UUID;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleIT {
  // Each function must be assigned a unique port to run on.
  // Otherwise, tests can flake when 2+ functions run simultaneously.
  // This is also specified in the `function-maven-plugin` config in `pom.xml`.
  private static final int PORT = 8083;

  // Root URL pointing to the locally hosted function
  // The Functions Framework Maven plugin lets us run a function locally
  private static final String BASE_URL = "http://localhost:" + PORT;

  private static Process emulatorProcess = null;
  private static HttpClient client = HttpClientBuilder.create().build();
  private static final Gson gson = new Gson();

  @BeforeClass
  public static void setUp() throws IOException {
    // Get the sample's base directory (the one containing a pom.xml file)
    String baseDir = System.getProperty("user.dir");

    // Emulate the function locally by running the Functions Framework Maven plugin
    emulatorProcess = new ProcessBuilder()
        .command("bash", "-c", "mvn function:run")
        .directory(new File(baseDir))
        .start();
  }

  @AfterClass
  public static void tearDown() throws IOException {
    // Display the output of the plugin process
    InputStream stdoutStream = emulatorProcess.getInputStream();
    ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
    stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));
    System.err.println(stdoutBytes.toString(StandardCharsets.UTF_8));

    // Terminate the running Functions Framework Maven plugin process (if it's still running)
    if (emulatorProcess.isAlive()) {
      emulatorProcess.destroy();
    }
  }

  @Test
  public void helloPubSub_shouldRunWithFunctionsFramework() throws Throwable {
    String functionUrl = BASE_URL + "/helloPubsub"; // URL to your locally-running function

    // Initialize constants
    String name = UUID.randomUUID().toString();
    String nameBase64 = Base64.getEncoder().encodeToString(name.getBytes(StandardCharsets.UTF_8));

    String jsonStr = gson.toJson(Map.of("data", Map.of("data", nameBase64)));

    HttpPost postRequest =  new HttpPost(URI.create(functionUrl));
    postRequest.setEntity(new StringEntity(jsonStr));

    // The Functions Framework Maven plugin process takes time to start up
    // Use resilience4j to retry the test HTTP request until the plugin responds
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .maxAttempts(12)
        .retryExceptions(HttpHostConnectException.class)
        .retryOnResult(u -> {
          // Retry if the Functions Framework process has no stdout content
          // See `retryOnResultPredicate` here: https://resilience4j.readme.io/docs/retry
          try {
            return emulatorProcess.getErrorStream().available() == 0;
          } catch (IOException e) {
            return true;
          }
        })
        .intervalFunction(IntervalFunction.ofExponentialBackoff(200, 2))
        .build());
    Retry retry = registry.retry("my");

    // Perform the request-retry process
    CheckedRunnable retriableFunc = Retry.decorateCheckedRunnable(
        retry, () -> client.execute(postRequest));
    retriableFunc.run();

    // Get Functions Framework plugin process' stdout
    InputStream stdoutStream = emulatorProcess.getErrorStream();
    ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
    stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));

    // Verify desired name value is present
    assertThat(stdoutBytes.toString(StandardCharsets.UTF_8)).contains(
        String.format("Hello %s!", name));
  }
}

Node.js

const assert = require('assert');
const {spawn} = require('child_process');
const {request} = require('gaxios');
const uuid = require('uuid');
const waitPort = require('wait-port');

  it('helloPubSub: should print a name', async () => {
    const name = uuid.v4();
    const PORT = 8088; // Each running framework instance needs a unique port

    const encodedName = Buffer.from(name).toString('base64');
    const pubsubMessage = {data: {data: encodedName}};
    ffProc = spawn('npx', [
      'functions-framework',
      '--target',
      'helloPubSub',
      '--signature-type',
      'event',
      '--port',
      PORT,
    ]);
    const ffProcHandler = new Promise((resolve, reject) => {
      let stdout = '';
      let stderr = '';
      ffProc.stdout.on('data', data => (stdout += data));
      ffProc.stderr.on('data', data => (stderr += data));
      ffProc.on('error', reject).on('exit', code => {
        code === 0 ? resolve(stdout) : reject(stderr);
      });
    });
    await waitPort({host: 'localhost', port: PORT});

    // Send HTTP request simulating Pub/Sub message
    // (GCF translates Pub/Sub messages to HTTP requests internally)
    const response = await request({
      url: `http://localhost:${PORT}/`,
      method: 'POST',
      data: pubsubMessage,
    });
    ffProc.kill();

    assert.strictEqual(response.status, 204);

    // Wait for the functions framework to stop
    const stdout = await ffProcHandler;
    assert.match(stdout, new RegExp(`Hello, ${name}!`));
  });
});

PHP


namespace Google\Cloud\Samples\Functions\HelloworldPubsub;

use Google\CloudFunctions\CloudEvent;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;

/**
 * Class SampleIntegrationTest.
 *
 * Integration tests for the 'Helloworld Pubsub' Cloud Function.
 */
class SampleIntegrationTest extends TestCase
{
    /** @var Process */
    private static $process;

    /** @var Client */
    private static $client;

    public function dataProvider()
    {
        return [
            [
                'cloudevent' => CloudEvent::fromArray([
                    'id' => uniqid(),
                    'source' => 'pubsub.googleapis.com',
                    'specversion' => '1.0',
                    'type' => 'google.cloud.pubsub.topic.v1.messagePublished',
                ]),
                'data' => [
                    'data' => base64_encode('John')
                ],
                'expected' => 'Hello, John!'
            ],
        ];
    }

    /**
     * Start a local PHP server running the Functions Framework.
     *
     * @beforeClass
     */
    public static function startFunctionFramework(): void
    {
        $port = getenv('PORT') ?: '8080';
        $php = (new PhpExecutableFinder())->find();
        $uri = 'localhost:' . $port;

        // https://symfony.com/doc/current/components/process.html#usage
        self::$process = new Process([$php, '-S', $uri, 'vendor/bin/router.php'], null, [
            'FUNCTION_SIGNATURE_TYPE' => 'cloudevent',
            'FUNCTION_TARGET' => 'helloworldPubsub',
        ]);
        self::$process->start();

        // Initialize an HTTP client to drive requests.
        self::$client = new Client(['base_uri' => 'http://' . $uri]);

        // Short delay to ensure PHP server is ready.
        sleep(1);
    }

    /**
     * Stop the local PHP server.
     *
     * @afterClass
     */
    public static function stopFunctionFramework(): void
    {
        if (!self::$process->isRunning()) {
            echo self::$process->getErrorOutput();
            throw new RuntimeException('Function Framework PHP process not running by end of test');
        }
        self::$process->stop(3, SIGTERM);
    }

    /**
     * @dataProvider dataProvider
     */
    public function testHelloPubsub(
        CloudEvent $cloudevent,
        array $data,
        string $expected
    ): void {
        // Send an HTTP request using CloudEvent metadata.
        $resp = self::$client->post('/', [
            'body' => json_encode($data),
            'headers' => [
                // Instruct the function framework to parse the body as JSON.
                'content-type' => 'application/json',

                // Prepare the HTTP headers for a CloudEvent.
                'ce-id' => $cloudevent->getId(),
                'ce-source' => $cloudevent->getSource(),
                'ce-specversion' => $cloudevent->getSpecVersion(),
                'ce-type' => $cloudevent->getType()
            ],
        ]);

        // The Cloud Function logs all data to stderr.
        $actual = self::$process->getIncrementalErrorOutput();

        // Verify the function's results are correctly logged.
        $this->assertStringContainsString($expected, $actual);
    }
}

Python

import base64
import os
import subprocess
import uuid

import requests
from requests.packages.urllib3.util.retry import Retry


def test_print_name():
    name = str(uuid.uuid4())
    port = 8088  # Each running framework instance needs a unique port

    encoded_name = base64.b64encode(name.encode('utf-8')).decode('utf-8')
    pubsub_message = {
        'data': {'data': encoded_name}
    }

    process = subprocess.Popen(
      [
        'functions-framework',
        '--target', 'hello_pubsub',
        '--signature-type', 'event',
        '--port', str(port)
      ],
      cwd=os.path.dirname(__file__),
      stdout=subprocess.PIPE
    )

    # Send HTTP request simulating Pub/Sub message
    # (GCF translates Pub/Sub messages to HTTP requests internally)
    url = f'http://localhost:{port}/'

    retry_policy = Retry(total=6, backoff_factor=1)
    retry_adapter = requests.adapters.HTTPAdapter(
      max_retries=retry_policy)

    session = requests.Session()
    session.mount(url, retry_adapter)

    response = session.post(url, json=pubsub_message)

    assert response.status_code == 200

    # Stop the functions framework process
    process.kill()
    process.wait()
    out, err = process.communicate()

    print(out, err, response.content)

    assert f'Hello {name}!' in str(out)

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.