Pub/Sub system tests

Demonstrates how to system test a function triggered by Pub/Sub.

Documentation pages that include this code sample

To view the code sample used in context, see the following documentation:

Code sample

C++

#include <google/cloud/pubsub/publisher.h>
#include <google/cloud/pubsub/topic_admin_client.h>
#include <boost/process.hpp>
#include <fmt/format.h>
#include <gmock/gmock.h>
#include <chrono>
#include <string>
#include <thread>

namespace {

namespace bp = ::boost::process;
namespace pubsub = ::google::cloud::pubsub;
using ::testing::AnyOf;
using ::testing::HasSubstr;

class PubsubSystemTest : public ::testing::Test {
 protected:
  void SetUp() override {
    // This test can only run if it is properly configured.
    auto const* project_id = std::getenv("GOOGLE_CLOUD_PROJECT");
    ASSERT_NE(project_id, nullptr);
    auto const* topic_id = std::getenv("TOPIC_ID");
    ASSERT_NE(topic_id, nullptr);
    auto const* service_id = std::getenv("SERVICE_ID");
    ASSERT_NE(service_id, nullptr);
    service_id_ = service_id;

    // Automatically setup the test environment, create the topic if it does
    // not exist.
    auto admin = pubsub::TopicAdminClient(pubsub::MakeTopicAdminConnection());
    topic_ = pubsub::Topic(project_id, topic_id);
    auto topic_metadata = admin.CreateTopic(pubsub::TopicBuilder(topic()));
    // If we get an error other than kAlreadyExists, abort the test.
    ASSERT_THAT(topic_metadata.status().code(),
                AnyOf(google::cloud::StatusCode::kOk,
                      google::cloud::StatusCode::kAlreadyExists));
  }

  void TearDown() override {}

  [[nodiscard]] pubsub::Topic const& topic() const { return topic_; }
  [[nodiscard]] std::string const& service_id() const { return service_id_; }

 private:
  pubsub::Topic topic_ = pubsub::Topic("--invalid--", "--invalid--");
  std::string service_id_;
};

TEST_F(PubsubSystemTest, Basic) {
  struct TestCases {
    std::string data;
    std::string expected;
  } cases[]{
      {"C++", "Hello C++"},
      {"World", "Hello World"},
  };

  auto publisher =
      pubsub::Publisher(pubsub::MakePublisherConnection(topic(), {}));

  for (auto const& test : cases) {
    SCOPED_TRACE("Testing for " + test.expected);
    auto id =
        publisher.Publish(pubsub::MessageBuilder().SetData(test.data).Build());
    publisher.Flush();
    EXPECT_TRUE(id.get().ok());

    std::vector<std::string> lines;
    // It may take a few seconds for the logs to propagate, so try a few times.
    using namespace std::chrono_literals;
    for (auto delay : {1s, 2s, 4s, 8s, 16s}) {
      bp::ipstream out;
      auto constexpr kProject = "--project={}";
      auto constexpr kLogFilter =
          "resource.type=cloud_run_revision AND "
          "resource.labels.service_name={} AND "
          "logName:stdout";
      auto reader = bp::child(bp::search_path("gcloud"), "logging", "read",
                              fmt::format(kProject, topic().project_id()),
                              fmt::format(kLogFilter, service_id()),
                              "--format=value(textPayload)", "--limit=1",
                              (bp::std_out & bp::std_err) > out);
      reader.wait();
      ASSERT_EQ(reader.exit_code(), 0);
      for (std::string l; std::getline(out, l);) lines.push_back(std::move(l));
      auto count = std::count_if(lines.begin(), lines.end(),
                                 [s = test.expected](auto l) {
                                   return l.find(s) != std::string::npos;
                                 });
      if (count != 0) break;
      std::this_thread::sleep_for(delay);
    }
    EXPECT_THAT(lines, Contains(HasSubstr(test.expected)));
  }
}

}  // namespace

Go


package helloworld

import (
	"context"
	"log"
	"os"
	"os/exec"
	"strings"
	"testing"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/gobuffalo/uuid"
)

func TestHelloPubSubSystem(t *testing.T) {
	ctx := context.Background()

	topicName := os.Getenv("FUNCTIONS_TOPIC")
	projectID := os.Getenv("GCP_PROJECT")

	startTime := time.Now().UTC().Format(time.RFC3339)

	// Create the Pub/Sub client and topic.
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		log.Fatal(err)
	}
	topic := client.Topic(topicName)

	// Publish a message with a random string to verify.
	// We use a random string to make sure the function is logging the correct
	// message for this test invocation.
	u := uuid.Must(uuid.NewV4())
	msg := &pubsub.Message{
		Data: []byte(u.String()),
	}
	topic.Publish(ctx, msg).Get(ctx)

	// Wait for logs to be consistent.
	time.Sleep(20 * time.Second)

	// Check logs after a delay.
	cmd := exec.Command("gcloud", "alpha", "functions", "logs", "read", "HelloPubSub", "--start-time", startTime)
	out, err := cmd.CombinedOutput()
	if err != nil {
		t.Fatalf("exec.Command: %v", err)
	}
	if got := string(out); !strings.Contains(got, u.String()) {
		t.Errorf("HelloPubSub got %q, want to contain %q", got, u.String())
	}
}

Java

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.paging.Page;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleSystemIT {

  // TODO<developer>: set these values (as environment variables)
  private static final String PROJECT_ID = System.getenv("GCP_PROJECT");
  private static final String TOPIC_NAME = System.getenv("FUNCTIONS_SYSTEM_TEST_TOPIC");
  private static final String FUNCTION_DEPLOYED_NAME = "HelloPubSub";

  private static Logging loggingClient;

  private static Publisher publisher;

  private HelloPubSub sampleUnderTest;

  @BeforeClass
  public static void setUp() throws IOException {
    loggingClient = LoggingOptions.getDefaultInstance().getService();
    publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
  }

  private static String getLogEntriesAsString(String startTimestamp) {
    // Construct Stackdriver logging filter
    // See this page for more info: https://cloud.google.com/logging/docs/view/advanced-queries
    String filter = "resource.type=\"cloud_function\""
        + " AND severity=INFO"
        + " AND resource.labels.function_name=" + FUNCTION_DEPLOYED_NAME
        + String.format(" AND timestamp>=\"%s\"", startTimestamp);

    // Get Stackdriver logging entries
    Page<LogEntry> logEntries =
        loggingClient.listLogEntries(
            Logging.EntryListOption.filter(filter),
            Logging.EntryListOption.sortOrder(
                Logging.SortingField.TIMESTAMP, Logging.SortingOrder.DESCENDING)
        );

    // Serialize Stackdriver logging entries + collect them into a single string
    String logsConcat = StreamSupport.stream(logEntries.getValues().spliterator(), false)
        .map((x) -> x.toString())
        .collect(Collectors.joining("%n"));

    return logsConcat;
  }

  @Test
  public void helloPubSub_shouldRunOnGcf() throws Exception {
    String name = UUID.randomUUID().toString();

    // Subtract time to work-around local-GCF clock difference
    Instant startInstant = Instant.now().minus(Duration.ofMinutes(4));
    String startTimestamp = DateTimeFormatter.ISO_INSTANT.format(startInstant);

    // Publish to pub/sub topic
    ByteString byteStr = ByteString.copyFrom(name, StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
    publisher.publish(pubsubApiMessage).get();

    // Keep retrying until the logs contain the desired invocation's log entry
    // (If the invocation failed, the retry process will eventually time out)
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .maxAttempts(8)
        .intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
        .retryOnResult(s -> !s.toString().contains(name))
        .build());
    Retry retry = registry.retry(name);
    String logEntry = Retry
        .decorateFunction(retry, ExampleSystemIT::getLogEntriesAsString)
        .apply(startTimestamp);

    // Perform final assertion (to make sure we fail on timeout)
    assertThat(logEntry).contains(name);
  }
}

Node.js

const childProcess = require('child_process');
const assert = require('assert');
const uuid = require('uuid');
const {PubSub} = require('@google-cloud/pubsub');
const moment = require('moment');
const promiseRetry = require('promise-retry');

const pubsub = new PubSub();
const topicName = process.env.FUNCTIONS_TOPIC;
const baseCmd = 'gcloud functions';

describe('system tests', () => {
  it('helloPubSub: should print a name', async () => {
    const name = uuid.v4();

    // Subtract time to work-around local-GCF clock difference
    const startTime = moment().subtract(4, 'minutes').toISOString();

    // Publish to pub/sub topic
    const topic = pubsub.topic(topicName);
    await topic.publish(Buffer.from(name));

    console.log(`published topic ${topicName}, ${name}`);

    // Wait for logs to become consistent
    await promiseRetry(retry => {
      const logs = childProcess
        .execSync(`${baseCmd} logs read helloPubSub --start-time ${startTime}`)
        .toString();

      try {
        assert.ok(logs.includes(`Hello, ${name}!`));
      } catch (err) {
        retry(err);
      }
    });
  });

Python

from datetime import datetime
from os import getenv
import subprocess
import time
import uuid

from google.cloud import pubsub_v1
import pytest

PROJECT = getenv('GCP_PROJECT')
TOPIC = getenv('FUNCTIONS_TOPIC')

assert PROJECT is not None
assert TOPIC is not None


@pytest.fixture(scope='module')
def publisher_client():
    yield pubsub_v1.PublisherClient()


def test_print_name(publisher_client):
    start_time = datetime.utcnow().isoformat()
    topic_path = publisher_client.topic_path(PROJECT, TOPIC)

    # Publish the message
    name = uuid.uuid4()
    data = str(name).encode('utf-8')
    publisher_client.publish(topic_path, data=data).result()

    # Wait for logs to become consistent
    time.sleep(15)

    # Check logs after a delay
    log_process = subprocess.Popen([
        'gcloud',
        'alpha',
        'functions',
        'logs',
        'read',
        'hello_pubsub',
        '--start-time',
        start_time
    ], stdout=subprocess.PIPE)
    logs = str(log_process.communicate()[0])
    assert 'Hello {}!'.format(name) in logs

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser