Demonstrates how to system test a function triggered by Pub/Sub.
Documentation pages that include this code sample
To view the code sample used in context, see the following documentation:
Code sample
C++
#include <google/cloud/pubsub/publisher.h>
#include <google/cloud/pubsub/topic_admin_client.h>
#include <boost/process.hpp>
#include <fmt/format.h>
#include <gmock/gmock.h>
#include <chrono>
#include <string>
#include <thread>
namespace {
namespace bp = ::boost::process;
namespace pubsub = ::google::cloud::pubsub;
using ::testing::AnyOf;
using ::testing::HasSubstr;
class PubsubSystemTest : public ::testing::Test {
protected:
void SetUp() override {
// This test can only run if it is properly configured.
auto const* project_id = std::getenv("GOOGLE_CLOUD_PROJECT");
ASSERT_NE(project_id, nullptr);
auto const* topic_id = std::getenv("TOPIC_ID");
ASSERT_NE(topic_id, nullptr);
auto const* service_id = std::getenv("SERVICE_ID");
ASSERT_NE(service_id, nullptr);
service_id_ = service_id;
// Automatically setup the test environment, create the topic if it does
// not exist.
auto admin = pubsub::TopicAdminClient(pubsub::MakeTopicAdminConnection());
topic_ = pubsub::Topic(project_id, topic_id);
auto topic_metadata = admin.CreateTopic(pubsub::TopicBuilder(topic()));
// If we get an error other than kAlreadyExists, abort the test.
ASSERT_THAT(topic_metadata.status().code(),
AnyOf(google::cloud::StatusCode::kOk,
google::cloud::StatusCode::kAlreadyExists));
}
void TearDown() override {}
[[nodiscard]] pubsub::Topic const& topic() const { return topic_; }
[[nodiscard]] std::string const& service_id() const { return service_id_; }
private:
pubsub::Topic topic_ = pubsub::Topic("--invalid--", "--invalid--");
std::string service_id_;
};
TEST_F(PubsubSystemTest, Basic) {
struct TestCases {
std::string data;
std::string expected;
} cases[]{
{"C++", "Hello C++"},
{"World", "Hello World"},
};
auto publisher =
pubsub::Publisher(pubsub::MakePublisherConnection(topic(), {}));
for (auto const& test : cases) {
SCOPED_TRACE("Testing for " + test.expected);
auto id =
publisher.Publish(pubsub::MessageBuilder().SetData(test.data).Build());
publisher.Flush();
EXPECT_TRUE(id.get().ok());
std::vector<std::string> lines;
// It may take a few seconds for the logs to propagate, so try a few times.
using namespace std::chrono_literals;
for (auto delay : {1s, 2s, 4s, 8s, 16s}) {
bp::ipstream out;
auto constexpr kProject = "--project={}";
auto constexpr kLogFilter =
"resource.type=cloud_run_revision AND "
"resource.labels.service_name={} AND "
"logName:stdout";
auto reader = bp::child(bp::search_path("gcloud"), "logging", "read",
fmt::format(kProject, topic().project_id()),
fmt::format(kLogFilter, service_id()),
"--format=value(textPayload)", "--limit=1",
(bp::std_out & bp::std_err) > out);
reader.wait();
ASSERT_EQ(reader.exit_code(), 0);
for (std::string l; std::getline(out, l);) lines.push_back(std::move(l));
auto count = std::count_if(lines.begin(), lines.end(),
[s = test.expected](auto l) {
return l.find(s) != std::string::npos;
});
if (count != 0) break;
std::this_thread::sleep_for(delay);
}
EXPECT_THAT(lines, Contains(HasSubstr(test.expected)));
}
}
} // namespace
Go
package helloworld
import (
"context"
"log"
"os"
"os/exec"
"strings"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/gobuffalo/uuid"
)
func TestHelloPubSubSystem(t *testing.T) {
ctx := context.Background()
topicName := os.Getenv("FUNCTIONS_TOPIC")
projectID := os.Getenv("GCP_PROJECT")
startTime := time.Now().UTC().Format(time.RFC3339)
// Create the Pub/Sub client and topic.
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatal(err)
}
topic := client.Topic(topicName)
// Publish a message with a random string to verify.
// We use a random string to make sure the function is logging the correct
// message for this test invocation.
u := uuid.Must(uuid.NewV4())
msg := &pubsub.Message{
Data: []byte(u.String()),
}
topic.Publish(ctx, msg).Get(ctx)
// Wait for logs to be consistent.
time.Sleep(20 * time.Second)
// Check logs after a delay.
cmd := exec.Command("gcloud", "alpha", "functions", "logs", "read", "HelloPubSub", "--start-time", startTime)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("exec.Command: %v", err)
}
if got := string(out); !strings.Contains(got, u.String()) {
t.Errorf("HelloPubSub got %q, want to contain %q", got, u.String())
}
}
Java
import static com.google.common.truth.Truth.assertThat;
import com.google.api.gax.paging.Page;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.BeforeClass;
import org.junit.Test;
public class ExampleSystemIT {
// TODO<developer>: set these values (as environment variables)
private static final String PROJECT_ID = System.getenv("GCP_PROJECT");
private static final String TOPIC_NAME = System.getenv("FUNCTIONS_SYSTEM_TEST_TOPIC");
private static final String FUNCTION_DEPLOYED_NAME = "HelloPubSub";
private static Logging loggingClient;
private static Publisher publisher;
private HelloPubSub sampleUnderTest;
@BeforeClass
public static void setUp() throws IOException {
loggingClient = LoggingOptions.getDefaultInstance().getService();
publisher = Publisher.newBuilder(
ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
}
private static String getLogEntriesAsString(String startTimestamp) {
// Construct Stackdriver logging filter
// See this page for more info: https://cloud.google.com/logging/docs/view/advanced-queries
String filter = "resource.type=\"cloud_function\""
+ " AND severity=INFO"
+ " AND resource.labels.function_name=" + FUNCTION_DEPLOYED_NAME
+ String.format(" AND timestamp>=\"%s\"", startTimestamp);
// Get Stackdriver logging entries
Page<LogEntry> logEntries =
loggingClient.listLogEntries(
Logging.EntryListOption.filter(filter),
Logging.EntryListOption.sortOrder(
Logging.SortingField.TIMESTAMP, Logging.SortingOrder.DESCENDING)
);
// Serialize Stackdriver logging entries + collect them into a single string
String logsConcat = StreamSupport.stream(logEntries.getValues().spliterator(), false)
.map((x) -> x.toString())
.collect(Collectors.joining("%n"));
return logsConcat;
}
@Test
public void helloPubSub_shouldRunOnGcf() throws Exception {
String name = UUID.randomUUID().toString();
// Subtract time to work-around local-GCF clock difference
Instant startInstant = Instant.now().minus(Duration.ofMinutes(4));
String startTimestamp = DateTimeFormatter.ISO_INSTANT.format(startInstant);
// Publish to pub/sub topic
ByteString byteStr = ByteString.copyFrom(name, StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
publisher.publish(pubsubApiMessage).get();
// Keep retrying until the logs contain the desired invocation's log entry
// (If the invocation failed, the retry process will eventually time out)
RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
.maxAttempts(8)
.intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
.retryOnResult(s -> !s.toString().contains(name))
.build());
Retry retry = registry.retry(name);
String logEntry = Retry
.decorateFunction(retry, ExampleSystemIT::getLogEntriesAsString)
.apply(startTimestamp);
// Perform final assertion (to make sure we fail on timeout)
assertThat(logEntry).contains(name);
}
}
Node.js
const childProcess = require('child_process');
const assert = require('assert');
const uuid = require('uuid');
const {PubSub} = require('@google-cloud/pubsub');
const moment = require('moment');
const promiseRetry = require('promise-retry');
const pubsub = new PubSub();
const topicName = process.env.FUNCTIONS_TOPIC;
const baseCmd = 'gcloud functions';
describe('system tests', () => {
it('helloPubSub: should print a name', async () => {
const name = uuid.v4();
// Subtract time to work-around local-GCF clock difference
const startTime = moment().subtract(4, 'minutes').toISOString();
// Publish to pub/sub topic
const topic = pubsub.topic(topicName);
await topic.publish(Buffer.from(name));
console.log(`published topic ${topicName}, ${name}`);
// Wait for logs to become consistent
await promiseRetry(retry => {
const logs = childProcess
.execSync(`${baseCmd} logs read helloPubSub --start-time ${startTime}`)
.toString();
try {
assert.ok(logs.includes(`Hello, ${name}!`));
} catch (err) {
retry(err);
}
});
});
Python
from datetime import datetime
from os import getenv
import subprocess
import time
import uuid
from google.cloud import pubsub_v1
import pytest
PROJECT = getenv('GCP_PROJECT')
TOPIC = getenv('FUNCTIONS_TOPIC')
assert PROJECT is not None
assert TOPIC is not None
@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()
def test_print_name(publisher_client):
start_time = datetime.utcnow().isoformat()
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
# Publish the message
name = uuid.uuid4()
data = str(name).encode('utf-8')
publisher_client.publish(topic_path, data=data).result()
# Wait for logs to become consistent
time.sleep(15)
# Check logs after a delay
log_process = subprocess.Popen([
'gcloud',
'alpha',
'functions',
'logs',
'read',
'hello_pubsub',
'--start-time',
start_time
], stdout=subprocess.PIPE)
logs = str(log_process.communicate()[0])
assert 'Hello {}!'.format(name) in logs
What's next
To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser