演示如何对 Pub/Sub 触发的函数进行系统测试。 这是一种“端到端”测试,该测试假定您已部署要测试的 Cloud Functions 函数。此示例中的代码通过发布 Pub/Sub 消息并等待观察 Cloud Logging 中的预期输出来触发 Cloud Functions 函数。
代码示例
Go
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
package helloworld
import (
"context"
"log"
"os"
"os/exec"
"strings"
"testing"
"time"
"cloud.google.com/go/pubsub"
"github.com/gobuffalo/uuid"
)
func TestHelloPubSubSystem(t *testing.T) {
ctx := context.Background()
topicName := os.Getenv("FUNCTIONS_TOPIC")
projectID := os.Getenv("GCP_PROJECT")
startTime := time.Now().UTC().Format(time.RFC3339)
// Create the Pub/Sub client and topic.
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatal(err)
}
topic := client.Topic(topicName)
// Publish a message with a random string to verify.
// We use a random string to make sure the function is logging the correct
// message for this test invocation.
u := uuid.Must(uuid.NewV4())
msg := &pubsub.Message{
Data: []byte(u.String()),
}
topic.Publish(ctx, msg).Get(ctx)
// Wait for logs to be consistent.
time.Sleep(20 * time.Second)
// Check logs after a delay.
cmd := exec.Command("gcloud", "alpha", "functions", "logs", "read", "HelloPubSub", "--start-time", startTime)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("exec.Command: %v", err)
}
if got := string(out); !strings.Contains(got, u.String()) {
t.Errorf("HelloPubSub got %q, want to contain %q", got, u.String())
}
}
Java
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import static com.google.common.truth.Truth.assertThat;
import com.google.api.gax.paging.Page;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.BeforeClass;
import org.junit.Test;
public class ExampleSystemIT {
// TODO<developer>: set these values (as environment variables)
private static final String PROJECT_ID = System.getenv("GCP_PROJECT");
private static final String TOPIC_NAME = System.getenv("FUNCTIONS_SYSTEM_TEST_TOPIC");
// TODO<developer>: set this value (as an environment variable) to HelloPubSub
private static final String FUNCTION_DEPLOYED_NAME = System.getenv("FUNCTIONS_PUBSUB_FN_NAME");
private static Logging loggingClient;
private static Publisher publisher;
private HelloPubSub sampleUnderTest;
@BeforeClass
public static void setUp() throws IOException {
loggingClient = LoggingOptions.getDefaultInstance().getService();
publisher = Publisher.newBuilder(
ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
}
private static String getLogEntriesAsString(String startTimestamp) {
// Construct Stackdriver logging filter
// See this page for more info: https://cloud.google.com/logging/docs/view/advanced-queries
String filter = "resource.type=\"cloud_function\""
+ " AND severity=INFO"
+ " AND resource.labels.function_name=" + FUNCTION_DEPLOYED_NAME
+ String.format(" AND timestamp>=\"%s\"", startTimestamp);
// Get Stackdriver logging entries
Page<LogEntry> logEntries =
loggingClient.listLogEntries(
Logging.EntryListOption.filter(filter),
Logging.EntryListOption.sortOrder(
Logging.SortingField.TIMESTAMP, Logging.SortingOrder.DESCENDING)
);
// Serialize Stackdriver logging entries + collect them into a single string
String logsConcat = StreamSupport.stream(logEntries.getValues().spliterator(), false)
.map((x) -> x.toString())
.collect(Collectors.joining("%n"));
return logsConcat;
}
@Test
public void helloPubSub_shouldRunOnGcf() throws Exception {
String name = UUID.randomUUID().toString();
// Subtract time to work-around local-GCF clock difference
Instant startInstant = Instant.now().minus(Duration.ofMinutes(4));
String startTimestamp = DateTimeFormatter.ISO_INSTANT.format(startInstant);
// Publish to pub/sub topic
ByteString byteStr = ByteString.copyFrom(name, StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
publisher.publish(pubsubApiMessage).get();
// Keep retrying until the logs contain the desired invocation's log entry
// (If the invocation failed, the retry process will eventually time out)
RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
.maxAttempts(8)
.intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
.retryOnResult(s -> !s.toString().contains(name))
.build());
Retry retry = registry.retry(name);
String logEntry = Retry
.decorateFunction(retry, ExampleSystemIT::getLogEntriesAsString)
.apply(startTimestamp);
// Perform final assertion (to make sure we fail on timeout)
assertThat(logEntry).contains(name);
}
}
Node.js
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
const childProcess = require('child_process');
const assert = require('assert');
const uuid = require('uuid');
const {PubSub} = require('@google-cloud/pubsub');
const moment = require('moment');
const promiseRetry = require('promise-retry');
const pubsub = new PubSub();
const topicName = process.env.FUNCTIONS_TOPIC;
if (!topicName) throw new Error('"FUNCTIONS_TOPIC" env var must be set.');
if (!process.env.GCF_REGION) {
throw new Error('"GCF_REGION" env var must be set.');
}
const baseCmd = 'gcloud functions';
describe('system tests', () => {
it('helloPubSub: should print a name', async () => {
const name = uuid.v4();
// Subtract time to work-around local-GCF clock difference
const startTime = moment().subtract(4, 'minutes').toISOString();
// Publish to pub/sub topic
const topic = pubsub.topic(topicName);
const data = Buffer.from(name);
await topic.publishMessage({data});
console.log(`published topic ${topicName}, ${name}`);
// Wait for logs to become consistent
await promiseRetry(retry => {
const logs = childProcess
.execSync(`${baseCmd} logs read helloPubSub --start-time ${startTime}`)
.toString();
try {
assert.ok(logs.includes(`Hello, ${name}!`));
} catch (err) {
console.log('An error occurred, retrying:', err);
retry(err);
}
});
});
Python
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
from datetime import datetime
from os import getenv
import subprocess
import time
import uuid
from google.cloud import pubsub_v1
import pytest
PROJECT = getenv("GCP_PROJECT")
TOPIC = getenv("FUNCTIONS_TOPIC")
assert PROJECT is not None
assert TOPIC is not None
@pytest.fixture(scope="module")
def publisher_client():
yield pubsub_v1.PublisherClient()
def test_print_name(publisher_client):
start_time = datetime.utcnow().isoformat()
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
# Publish the message
name = uuid.uuid4()
data = str(name).encode("utf-8")
publisher_client.publish(topic_path, data=data).result()
# Wait for logs to become consistent
time.sleep(15)
# Check logs after a delay
log_process = subprocess.Popen(
[
"gcloud",
"alpha",
"functions",
"logs",
"read",
"hello_pubsub",
"--start-time",
start_time,
],
stdout=subprocess.PIPE,
)
logs = str(log_process.communicate()[0])
assert f"Hello {name}!" in logs
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。