演示如何对 Pub/Sub 触发的函数进行集成测试。
代码示例
Java
如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import static com.google.common.truth.Truth.assertThat;
import com.google.gson.Gson;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedRunnable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.UUID;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class ExampleIT {
// Each function must be assigned a unique port to run on.
// Otherwise, tests can flake when 2+ functions run simultaneously.
// This is also specified in the `function-maven-plugin` config in `pom.xml`.
private static final int PORT = 8083;
// Root URL pointing to the locally hosted function
// The Functions Framework Maven plugin lets us run a function locally
private static final String BASE_URL = "http://localhost:" + PORT;
private static Process emulatorProcess = null;
private static HttpClient client = HttpClientBuilder.create().build();
private static final Gson gson = new Gson();
@BeforeClass
public static void setUp() throws IOException {
// Get the sample's base directory (the one containing a pom.xml file)
String baseDir = System.getProperty("user.dir");
// Emulate the function locally by running the Functions Framework Maven plugin
emulatorProcess = new ProcessBuilder()
.command("bash", "-c", "mvn function:run")
.directory(new File(baseDir))
.start();
}
@AfterClass
public static void tearDown() throws IOException {
// Display the output of the plugin process
InputStream stdoutStream = emulatorProcess.getInputStream();
ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));
System.err.println(stdoutBytes.toString(StandardCharsets.UTF_8));
// Terminate the running Functions Framework Maven plugin process (if it's still running)
if (emulatorProcess.isAlive()) {
emulatorProcess.destroy();
}
}
@Test
public void helloPubSub_shouldRunWithFunctionsFramework() throws Throwable {
String functionUrl = BASE_URL + "/helloPubsub"; // URL to your locally-running function
// Initialize constants
String name = UUID.randomUUID().toString();
String nameBase64 = Base64.getEncoder().encodeToString(name.getBytes(StandardCharsets.UTF_8));
String jsonStr = gson.toJson(Map.of("data", Map.of("data", nameBase64)));
HttpPost postRequest = new HttpPost(URI.create(functionUrl));
postRequest.setEntity(new StringEntity(jsonStr));
// The Functions Framework Maven plugin process takes time to start up
// Use resilience4j to retry the test HTTP request until the plugin responds
RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
.maxAttempts(12)
.retryExceptions(HttpHostConnectException.class)
.retryOnResult(u -> {
// Retry if the Functions Framework process has no stdout content
// See `retryOnResultPredicate` here: https://resilience4j.readme.io/docs/retry
try {
return emulatorProcess.getErrorStream().available() == 0;
} catch (IOException e) {
return true;
}
})
.intervalFunction(IntervalFunction.ofExponentialBackoff(200, 2))
.build());
Retry retry = registry.retry("my");
// Perform the request-retry process
CheckedRunnable retriableFunc = Retry.decorateCheckedRunnable(
retry, () -> client.execute(postRequest));
retriableFunc.run();
// Get Functions Framework plugin process' stdout
InputStream stdoutStream = emulatorProcess.getErrorStream();
ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));
// Verify desired name value is present
assertThat(stdoutBytes.toString(StandardCharsets.UTF_8)).contains(
String.format("Hello %s!", name));
}
}
Node.js
如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
const assert = require('assert');
const {spawn} = require('child_process');
const {request} = require('gaxios');
const uuid = require('uuid');
const waitPort = require('wait-port');
it('helloPubSub: should print a name', async () => {
const name = uuid.v4();
const PORT = 8088; // Each running framework instance needs a unique port
const encodedName = Buffer.from(name).toString('base64');
const pubsubMessage = {data: {data: encodedName}};
const ffProc = spawn('npx', [
'functions-framework',
'--target',
'helloPubSub',
'--signature-type',
'event',
'--port',
PORT,
]);
try {
const ffProcHandler = new Promise((resolve, reject) => {
let stdout = '';
let stderr = '';
ffProc.stdout.on('data', data => (stdout += data));
ffProc.stderr.on('data', data => (stderr += data));
ffProc.on('exit', code => {
if (code === 0 || code === null) {
// code === null corresponds to a signal-kill
// (which doesn't necessarily indicate a test failure)
resolve(stdout);
} else {
stderr = `Error code: ${code}\n${stderr}`;
reject(new Error(stderr));
}
});
});
await waitPort({host: 'localhost', port: PORT});
// Send HTTP request simulating Pub/Sub message
// (GCF translates Pub/Sub messages to HTTP requests internally)
const response = await request({
url: `http://localhost:${PORT}/`,
method: 'POST',
data: pubsubMessage,
});
ffProc.kill();
assert.strictEqual(response.status, 204);
// Wait for the functions framework to stop
const stdout = await ffProcHandler;
assert.match(stdout, new RegExp(`Hello, ${name}!`));
} catch {
if (ffProc) {
// Make sure the functions framework is stopped
ffProc.kill();
}
}
});
});
PHP
如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace Google\Cloud\Samples\Functions\HelloworldPubsub;
use Google\CloudFunctions\CloudEvent;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;
/**
* Class SampleIntegrationTest.
*
* Integration tests for the 'Helloworld Pubsub' Cloud Function.
*/
class SampleIntegrationTest extends TestCase
{
/** @var Process */
private static $process;
/** @var Client */
private static $client;
public function dataProvider()
{
return [
[
'cloudevent' => CloudEvent::fromArray([
'id' => uniqid(),
'source' => 'pubsub.googleapis.com',
'specversion' => '1.0',
'type' => 'google.cloud.pubsub.topic.v1.messagePublished',
]),
'data' => [
'data' => base64_encode('John')
],
'expected' => 'Hello, John!'
],
];
}
/**
* Start a local PHP server running the Functions Framework.
*
* @beforeClass
*/
public static function startFunctionFramework(): void
{
$port = getenv('PORT') ?: '8080';
$php = (new PhpExecutableFinder())->find();
$uri = 'localhost:' . $port;
// https://symfony.com/doc/current/components/process.html#usage
self::$process = new Process([$php, '-S', $uri, 'vendor/bin/router.php'], null, [
'FUNCTION_SIGNATURE_TYPE' => 'cloudevent',
'FUNCTION_TARGET' => 'helloworldPubsub',
]);
self::$process->start();
// Initialize an HTTP client to drive requests.
self::$client = new Client(['base_uri' => 'http://' . $uri]);
// Short delay to ensure PHP server is ready.
sleep(1);
}
/**
* Stop the local PHP server.
*
* @afterClass
*/
public static function stopFunctionFramework(): void
{
if (!self::$process->isRunning()) {
echo self::$process->getErrorOutput();
throw new RuntimeException('Function Framework PHP process not running by end of test');
}
self::$process->stop(3, SIGTERM);
}
/**
* @dataProvider dataProvider
*/
public function testHelloPubsub(
CloudEvent $cloudevent,
array $data,
string $expected
): void {
// Send an HTTP request using CloudEvent metadata.
$resp = self::$client->post('/', [
'body' => json_encode($data),
'headers' => [
// Instruct the function framework to parse the body as JSON.
'content-type' => 'application/json',
// Prepare the HTTP headers for a CloudEvent.
'ce-id' => $cloudevent->getId(),
'ce-source' => $cloudevent->getSource(),
'ce-specversion' => $cloudevent->getSpecVersion(),
'ce-type' => $cloudevent->getType()
],
]);
// The Cloud Function logs all data to stderr.
$actual = self::$process->getIncrementalErrorOutput();
// Verify the function's results are correctly logged.
$this->assertStringContainsString($expected, $actual);
}
}
Python
如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import base64
import os
import subprocess
import uuid
import requests
from requests.packages.urllib3.util.retry import Retry
def test_print_name():
name = str(uuid.uuid4())
port = 8088 # Each running framework instance needs a unique port
encoded_name = base64.b64encode(name.encode("utf-8")).decode("utf-8")
pubsub_message = {"data": {"data": encoded_name}}
process = subprocess.Popen(
[
"functions-framework",
"--target",
"hello_pubsub",
"--signature-type",
"event",
"--port",
str(port),
],
cwd=os.path.dirname(__file__),
stdout=subprocess.PIPE,
)
# Send HTTP request simulating Pub/Sub message
# (GCF translates Pub/Sub messages to HTTP requests internally)
url = f"http://localhost:{port}/"
retry_policy = Retry(total=6, backoff_factor=1)
retry_adapter = requests.adapters.HTTPAdapter(max_retries=retry_policy)
session = requests.Session()
session.mount(url, retry_adapter)
response = session.post(url, json=pubsub_message)
assert response.status_code == 200
# Stop the functions framework process
process.kill()
process.wait()
out, err = process.communicate()
print(out, err, response.content)
assert f"Hello {name}!" in str(out)
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。