Pub/Sub system tests

Demonstrates how to system test a function triggered by Pub/Sub.

To authenticate to Cloud Functions, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

package helloworld

import (


func TestHelloPubSubSystem(t *testing.T) {
	ctx := context.Background()

	topicName := os.Getenv("FUNCTIONS_TOPIC")
	projectID := os.Getenv("GCP_PROJECT")

	startTime := time.Now().UTC().Format(time.RFC3339)

	// Create the Pub/Sub client and topic.
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
	topic := client.Topic(topicName)

	// Publish a message with a random string to verify.
	// We use a random string to make sure the function is logging the correct
	// message for this test invocation.
	u := uuid.Must(uuid.NewV4())
	msg := &pubsub.Message{
		Data: []byte(u.String()),
	topic.Publish(ctx, msg).Get(ctx)

	// Wait for logs to be consistent.
	time.Sleep(20 * time.Second)

	// Check logs after a delay.
	cmd := exec.Command("gcloud", "alpha", "functions", "logs", "read", "HelloPubSub", "--start-time", startTime)
	out, err := cmd.CombinedOutput()
	if err != nil {
		t.Fatalf("exec.Command: %v", err)
	if got := string(out); !strings.Contains(got, u.String()) {
		t.Errorf("HelloPubSub got %q, want to contain %q", got, u.String())


import static;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleSystemIT {

  // TODO<developer>: set these values (as environment variables)
  private static final String PROJECT_ID = System.getenv("GCP_PROJECT");
  private static final String TOPIC_NAME = System.getenv("FUNCTIONS_SYSTEM_TEST_TOPIC");

  // TODO<developer>: set this value (as an environment variable) to HelloPubSub
  private static final String FUNCTION_DEPLOYED_NAME = System.getenv("FUNCTIONS_PUBSUB_FN_NAME");

  private static Logging loggingClient;

  private static Publisher publisher;

  private HelloPubSub sampleUnderTest;

  public static void setUp() throws IOException {
    loggingClient = LoggingOptions.getDefaultInstance().getService();
    publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();

  private static String getLogEntriesAsString(String startTimestamp) {
    // Construct Stackdriver logging filter
    // See this page for more info:
    String filter = "resource.type=\"cloud_function\""
        + " AND severity=INFO"
        + " AND resource.labels.function_name=" + FUNCTION_DEPLOYED_NAME
        + String.format(" AND timestamp>=\"%s\"", startTimestamp);

    // Get Stackdriver logging entries
    Page<LogEntry> logEntries =
                Logging.SortingField.TIMESTAMP, Logging.SortingOrder.DESCENDING)

    // Serialize Stackdriver logging entries + collect them into a single string
    String logsConcat =, false)
        .map((x) -> x.toString())

    return logsConcat;

  public void helloPubSub_shouldRunOnGcf() throws Exception {
    String name = UUID.randomUUID().toString();

    // Subtract time to work-around local-GCF clock difference
    Instant startInstant =;
    String startTimestamp = DateTimeFormatter.ISO_INSTANT.format(startInstant);

    // Publish to pub/sub topic
    ByteString byteStr = ByteString.copyFrom(name, StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    // Keep retrying until the logs contain the desired invocation's log entry
    // (If the invocation failed, the retry process will eventually time out)
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
        .retryOnResult(s -> !s.toString().contains(name))
    Retry retry = registry.retry(name);
    String logEntry = Retry
        .decorateFunction(retry, ExampleSystemIT::getLogEntriesAsString)

    // Perform final assertion (to make sure we fail on timeout)


const childProcess = require('child_process');
const assert = require('assert');
const uuid = require('uuid');
const {PubSub} = require('@google-cloud/pubsub');
const moment = require('moment');
const promiseRetry = require('promise-retry');

const pubsub = new PubSub();
const topicName = process.env.FUNCTIONS_TOPIC;
if (!topicName) throw new Error('"FUNCTIONS_TOPIC" env var must be set.');
if (!process.env.GCF_REGION) {
  throw new Error('"GCF_REGION" env var must be set.');
const baseCmd = 'gcloud functions';

describe('system tests', () => {
  it('helloPubSub: should print a name', async () => {
    const name = uuid.v4();

    // Subtract time to work-around local-GCF clock difference
    const startTime = moment().subtract(4, 'minutes').toISOString();

    // Publish to pub/sub topic
    const topic = pubsub.topic(topicName);
    const data = Buffer.from(name);
    await topic.publishMessage({data});

    console.log(`published topic ${topicName}, ${name}`);

    // Wait for logs to become consistent
    await promiseRetry(retry => {
      const logs = childProcess
        .execSync(`${baseCmd} logs read helloPubSub --start-time ${startTime}`)

      try {
        assert.ok(logs.includes(`Hello, ${name}!`));
      } catch (err) {
        console.log('An error occurred, retrying:', err);


from datetime import datetime
from os import getenv
import subprocess
import time
import uuid

from import pubsub_v1
import pytest


assert PROJECT is not None
assert TOPIC is not None

def publisher_client():
    yield pubsub_v1.PublisherClient()

def test_print_name(publisher_client):
    start_time = datetime.utcnow().isoformat()
    topic_path = publisher_client.topic_path(PROJECT, TOPIC)

    # Publish the message
    name = uuid.uuid4()
    data = str(name).encode('utf-8')
    publisher_client.publish(topic_path, data=data).result()

    # Wait for logs to become consistent

    # Check logs after a delay
    log_process = subprocess.Popen([
    ], stdout=subprocess.PIPE)
    logs = str(log_process.communicate()[0])
    assert f'Hello {name}!' in logs

