Cloud Storage の統合テスト

Cloud Storage によってトリガーされた関数に統合テストを行います。

このコードサンプルが含まれるドキュメント ページ

コンテキストで使用されているコードサンプルを見るには、次のドキュメントをご覧ください。

コードサンプル

C++

#include <boost/filesystem.hpp>
#include <boost/process.hpp>
#include <curl/curl.h>
#include <gmock/gmock.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <string>
#include <thread>

namespace {

namespace bp = boost::process;
namespace bfs = boost::filesystem;  // Boost.Process cannot use std::filesystem
using ::testing::Contains;
using ::testing::HasSubstr;
using ::testing::IsEmpty;

struct HttpResponse {
  long code;  // NOLINT(google-runtime-int)
  std::string payload;
};

HttpResponse HttpEvent(std::string const& url, std::string const& payload);

char const* argv0 = nullptr;

auto ExePath(bfs::path const& filename) {
  static auto const kPath = std::vector<bfs::path>{
      bfs::canonical(argv0).make_preferred().parent_path()};
  return bp::search_path(filename, kPath);
}

class StorageIntegrationTest : public ::testing::Test {
 protected:
  void SetUp() override {
    curl_global_init(CURL_GLOBAL_ALL);

    auto server =
        bp::child(ExePath("storage_integration_server"), "--port=8050",
                  (bp::std_out & bp::std_err) > child_log_);
    ASSERT_TRUE(WaitForServerReady("http://localhost:8050/"));
    process_ = std::move(server);
  }

  // Wait until an HTTP server starts responding.
  bool WaitForServerReady(std::string const& url);

  void TearDown() override {
    process_.terminate();
    process_.wait();
    curl_global_cleanup();
  }

  std::string NextChildLine() {
    std::string line;
    std::getline(child_log_, line);
    std::cerr << " ... line=" << line << std::endl;
    return line;
  }

 private:
  bp::ipstream child_log_;
  bp::child process_;
};

TEST_F(StorageIntegrationTest, Basic) {
  auto constexpr kOkay = 200;

  auto const base = nlohmann::json::parse(R"js({
    "specversion": "1.0",
    "type": "google.cloud.storage.object.v1.finalized",
    "source": "//storage.googleapis.com/projects/_/buckets/some-bucket",
    "subject": "objects/folder/Test.cs",
    "id": "aaaaaa-1111-bbbb-2222-cccccccccccc",
    "time": "2020-09-29T11:32:00.000Z",
    "datacontenttype": "application/json",
    "data": {
      "kind": "storage#object",
      "bucket": "some-bucket",
      "name": "object-name",
      "generation": "1587627537231057",
      "timeCreated": "2020-04-23T07:38:57.230Z",
      "updated": "2020-04-23T07:38:57.230Z"
    }
  })js");

  struct TestCases {
    std::string name;
    std::string expected;
  } cases[]{
      {"object1.txt", "Object: object1.txt"},
      {"object/with/longer/name.txt", "Object: object/with/longer/name.txt"},
  };

  for (auto const& test : cases) {
    SCOPED_TRACE("Testing for " + test.expected);
    auto object = base;
    object["data"]["name"] = test.name;
    SCOPED_TRACE("event=" + object.dump());
    auto actual = HttpEvent("http://localhost:8050/", object.dump());
    ASSERT_EQ(actual.code, kOkay);
    EXPECT_THAT(actual.payload, IsEmpty());

    std::vector<std::string> lines;
    for (auto line = NextChildLine();
         line.find("Updated: ") == std::string::npos; line = NextChildLine()) {
      lines.push_back(std::move(line));
    }
    EXPECT_THAT(lines, Contains(HasSubstr(test.expected)));
  }
}

extern "C" size_t CurlOnWriteData(char* ptr, size_t size, size_t nmemb,
                                  void* userdata) {
  reinterpret_cast<std::string*>(userdata)->append(ptr, size * nmemb);
  return size * nmemb;
}

HttpResponse HttpEvent(std::string const& url, std::string const& payload) {
  using CurlHandle = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>;
  using CurlHeaders =
      std::unique_ptr<curl_slist, decltype(&curl_slist_free_all)>;

  auto easy = CurlHandle(curl_easy_init(), curl_easy_cleanup);

  auto setopt = [h = easy.get()](auto opt, auto value) {
    if (auto e = curl_easy_setopt(h, opt, value); e != CURLE_OK) {
      std::ostringstream os;
      os << "error [" << e << "] setting curl_easy option <" << opt
         << ">=" << value;
      throw std::runtime_error(std::move(os).str());
    }
  };
  auto get_response_code = [h = easy.get()]() {
    long code;  // NOLINT(google-runtime-int)
    auto e = curl_easy_getinfo(h, CURLINFO_RESPONSE_CODE, &code);
    if (e != CURLE_OK) std::runtime_error("Cannot get response code");
    return code;
  };

  auto headers = CurlHeaders(nullptr, curl_slist_free_all);
  auto add_header = [&headers](std::string const& h) {
    auto* nh = curl_slist_append(headers.get(), h.c_str());
    (void)headers.release();
    headers.reset(nh);
  };

  setopt(CURLOPT_URL, url.c_str());
  setopt(CURLOPT_POSTFIELDSIZE, payload.size());
  setopt(CURLOPT_POSTFIELDS, payload.data());
  std::string buffer;
  setopt(CURLOPT_WRITEDATA, &buffer);
  setopt(CURLOPT_WRITEFUNCTION, &CurlOnWriteData);
  add_header("Content-Type: application/cloudevents+json");
  add_header("Content-Length: " + std::to_string(payload.size()));
  setopt(CURLOPT_HTTPHEADER, headers.get());

  auto e = curl_easy_perform(easy.get());
  if (e != CURLE_OK) {
    std::ostringstream os;
    os << "Error in curl_easy_perform [" << e << "] - "
       << curl_easy_strerror(e);
    throw std::runtime_error(std::move(os).str());
  }
  return HttpResponse{get_response_code(), std::move(buffer)};
}

bool StorageIntegrationTest::WaitForServerReady(std::string const& url) {
  using namespace std::chrono_literals;
  auto constexpr kOkay = 200;
  for (auto delay : {100ms, 200ms, 400ms, 800ms, 1600ms}) {  // NOLINT
    std::cout << "Waiting for server to start [" << delay.count() << "ms]\n";
    std::this_thread::sleep_for(delay);
    auto constexpr kPing = R"js({
      "specversion": "1.0",
      "type": "test-type",
      "source": "test-source",
      "id": "test-id",
      "datacontenttype": "application/json",
      "data": {
        "kind": "storage#object",
        "bucket": "wait-for-server-ready",
        "name": "wait-for-server-ready",
        "generation": "1587627537231057",
        "id": "some-bucket/object-name/1587627537231057",
        "timeCreated": "2020-04-23T07:38:57.230Z",
        "updated": "2020-04-23T07:38:57.230Z"
      }
    })js";
    try {
      auto const r = HttpEvent(url, kPing);
      if (r.code != kOkay) {
        std::cerr << "... [" << r.code << "]" << std::endl;
        continue;
      }
    } catch (std::exception const& ex) {
      // The HttpEvent() function may fail with an exception until the server is
      // ready. Log it to ease troubleshooting in the CI builds.
      std::cerr << "WaitForServerReady[" << delay.count()
                << "ms]: server ping failed with " << ex.what() << std::endl;
      continue;
    }
    for (auto line = NextChildLine(); !line.empty(); line = NextChildLine()) {
      if (line.find("Updated: ") != std::string::npos) return true;
    }
  }
  return false;
}

}  // namespace

int main(int argc, char* argv[]) {
  ::testing::InitGoogleMock(&argc, argv);
  ::argv0 = argv[0];
  return RUN_ALL_TESTS();
}

Java


import static com.google.common.truth.Truth.assertThat;

import com.google.gson.Gson;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedRunnable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleIT {
  // Each function must be assigned a unique port to run on.
  // Otherwise, tests can flake when 2+ functions run simultaneously.
  // This is also specified in the `function-maven-plugin` config in `pom.xml`.
  private static final int PORT = 8082;

  // Root URL pointing to the locally hosted function
  // The Functions Framework Maven plugin lets us run a function locally
  private static final String BASE_URL = "http://localhost:" + PORT;

  private static Process emulatorProcess = null;
  private static final HttpClient client = HttpClientBuilder.create().build();
  private static final Gson gson = new Gson();

  @BeforeClass
  public static void setUp() throws IOException {
    // Get the sample's base directory (the one containing a pom.xml file)
    String baseDir = System.getProperty("user.dir");

    // Emulate the function locally by running the Functions Framework Maven plugin
    emulatorProcess = new ProcessBuilder()
        .command("mvn", "function:run")
        .directory(new File(baseDir))
        .start();
  }

  @AfterClass
  public static void tearDown() throws IOException {
    // Display the output of the plugin process
    InputStream stdoutStream = emulatorProcess.getInputStream();
    ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
    stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));
    System.out.println(stdoutBytes.toString(StandardCharsets.UTF_8));

    // Terminate the running Functions Framework Maven plugin process (if it's still running)
    if (emulatorProcess.isAlive()) {
      emulatorProcess.destroy();
    }
  }

  @Test
  public void helloGcs_shouldRunWithFunctionsFramework() throws Throwable {
    String functionUrl = BASE_URL + "/helloGcs"; // URL to your locally-running function

    // Initialize constants
    String name = UUID.randomUUID().toString();
    String jsonStr = gson.toJson(Map.of(
        "data", Map.of(
            "name", name, "resourceState", "exists", "metageneration", 1),
        "context", Map.of(
            "eventType", "google.storage.object.finalize")
    ));

    HttpPost postRequest =  new HttpPost(URI.create(functionUrl));
    postRequest.setEntity(new StringEntity(jsonStr));

    // The Functions Framework Maven plugin process takes time to start up
    // Use resilience4j to retry the test HTTP request until the plugin responds
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .maxAttempts(12)
        .retryExceptions(HttpHostConnectException.class)
        .retryOnResult(u -> {
          // Retry if the Functions Framework process has no stdout content
          // See `retryOnResultPredicate` here: https://resilience4j.readme.io/docs/retry
          try {
            return emulatorProcess.getErrorStream().available() == 0;
          } catch (IOException e) {
            return true;
          }
        })
        .intervalFunction(IntervalFunction.ofExponentialBackoff(200, 2))
        .build());
    Retry retry = registry.retry("my");

    // Perform the request-retry process
    CheckedRunnable retriableFunc = Retry.decorateCheckedRunnable(
        retry, () -> client.execute(postRequest));
    retriableFunc.run();

    // Get Functions Framework plugin process' stdout
    InputStream stdoutStream = emulatorProcess.getErrorStream();
    ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
    stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));

    // Verify desired name value is present
    assertThat(stdoutBytes.toString(StandardCharsets.UTF_8)).contains(
        String.format("File: %s", name));
  }
}

Node.js

const assert = require('assert');
const {spawn} = require('child_process');
const uuid = require('uuid');
const {request} = require('gaxios');
const waitPort = require('wait-port');

  it('helloGCSGeneric: should print GCS event', async () => {
    const filename = uuid.v4(); // Use a unique filename to avoid conflicts
    const PORT = 9000; // Each running framework instance needs a unique port

    const eventType = 'google.storage.object.finalize';

    const data = {
      data: {
        name: filename,
        resourceState: 'exists',
        metageneration: '1',
      },
      context: {
        eventType: eventType,
      },
    };
    ffProc = spawn('npx', [
      'functions-framework',
      '--target',
      'helloGCS',
      '--signature-type',
      'event',
      '--port',
      PORT,
    ]);
    const ffProcHandler = new Promise((resolve, reject) => {
      let stdout = '';
      let stderr = '';
      ffProc.stdout.on('data', data => (stdout += data));
      ffProc.stderr.on('data', data => (stderr += data));
      ffProc.on('error', reject).on('exit', code => {
        code === 0 ? resolve(stdout) : reject(stderr);
      });
    });
    await waitPort({host: 'localhost', port: PORT});

    // Send HTTP request simulating GCS change notification
    // (GCF translates GCS notifications to HTTP requests internally)
    const response = await request({
      url: `http://localhost:${PORT}/`,
      method: 'POST',
      data,
    });
    assert.strictEqual(response.status, 204);

    ffProc.kill();
    const stdout = await ffProcHandler;
    assert.ok(stdout.includes(`File: ${filename}`));
    assert.ok(stdout.includes(`Event Type: ${eventType}`));
  });
});

PHP


namespace Google\Cloud\Samples\Functions\HelloworldStorage;

use Google\CloudFunctions\CloudEvent;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;

/**
 * Class SampleIntegrationTest.
 *
 * Integration tests for the 'Helloworld Storage' Cloud Function.
 */
class SampleIntegrationTest extends TestCase
{
    /** @var Process */
    private static $process;

    /** @var Client */
    private static $client;

    public function dataProvider()
    {
        return [
            [
                'cloudevent' => [
                    'id' => uniqid(),
                    'source' => 'storage.googleapis.com',
                    'specversion' => '1.0',
                    'type' => 'google.cloud.storage.object.v1.finalized',
                ],
                'data' => [
                    'bucket' => 'some-bucket',
                    'metageneration' => '1',
                    'name' => 'folder/friendly.txt',
                    'timeCreated' => '2020-04-23T07:38:57.230Z',
                    'updated' => '2020-04-23T07:38:57.230Z',
                ],
                'statusCode' => '200',
            ],
        ];
    }

    /**
     * @dataProvider dataProvider
     */
    public function testHelloGCS(array $cloudevent, array $data, string $statusCode): void
    {
        // Prepare the HTTP headers for a CloudEvent.
        $cloudEventHeaders = [];
        foreach ($cloudevent as $key => $value) {
            $cloudEventHeaders['ce-' . $key] = $value;
        }

        // Send an HTTP request using CloudEvent metadata.
        $resp = self::$client->post('/', [
            'body' => json_encode($data),
            'headers' => $cloudEventHeaders + [
                // Instruct the function framework to parse the body as JSON.
                'content-type' => 'application/json'
            ],
        ]);

        // The Cloud Function logs all data to stderr.
        $actual = self::$process->getIncrementalErrorOutput();

        // Confirm the status code.
        $this->assertEquals($statusCode, $resp->getStatusCode());

        // Verify the CloudEvent and data properties are logged by the function.
        foreach ($data as $property => $value) {
            $this->assertStringContainsString($value, $actual);
        }
        $this->assertStringContainsString($cloudevent['id'], $actual);
        $this->assertStringContainsString($cloudevent['type'], $actual);
    }

    /**
     * Start a local PHP server running the Functions Framework.
     *
     * @beforeClass
     */
    public static function startFunctionFramework(): void
    {
        $port = getenv('PORT') ?: '8080';
        $php = (new PhpExecutableFinder())->find();
        $uri = 'localhost:' . $port;

        // https://symfony.com/doc/current/components/process.html#usage
        self::$process = new Process([$php, '-S', $uri, 'vendor/bin/router.php'], null, [
            'FUNCTION_SIGNATURE_TYPE' => 'cloudevent',
            'FUNCTION_TARGET' => 'helloGCS',
        ]);
        self::$process->start();

        // Initialize an HTTP client to drive requests.
        self::$client = new Client(['base_uri' => 'http://' . $uri]);

        // Short delay to ensure PHP server is ready.
        sleep(1);
    }

    /**
     * Stop the local PHP server.
     *
     * @afterClass
     */
    public static function stopFunctionFramework(): void
    {
        if (!self::$process->isRunning()) {
            echo self::$process->getErrorOutput();
            throw new RuntimeException('Function Framework PHP process not running by end of test');
        }
        self::$process->stop(3, SIGTERM);
    }
}

Python

import datetime
import os
import subprocess
import uuid

import requests
from requests.packages.urllib3.util.retry import Retry

def test_print_name():
    filename = str(uuid.uuid4())
    port = 8089  # Each running framework instance needs a unique port

    example_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
    storage_message = {
        'data': {
            'name': filename,
            'bucket': 'my_bucket',
            'metageneration': '1',
            'timeCreated': example_timestamp,
            'updated': example_timestamp
        }
    }

    process = subprocess.Popen(
      [
        'functions-framework',
        '--target', 'hello_gcs',
        '--signature-type', 'event',
        '--port', str(port)
      ],
      cwd=os.path.dirname(__file__),
      stdout=subprocess.PIPE
    )

    # Send HTTP request simulating Pub/Sub message
    # (GCF translates Pub/Sub messages to HTTP requests internally)
    url = f'http://localhost:{port}/'

    retry_policy = Retry(total=6, backoff_factor=1)
    retry_adapter = requests.adapters.HTTPAdapter(
      max_retries=retry_policy)

    session = requests.Session()
    session.mount(url, retry_adapter)

    response = session.post(url, json=storage_message)

    assert response.status_code == 200

    # Stop the functions framework process
    process.kill()
    process.wait()
    out, err = process.communicate()

    print(out, err, response.content)

    assert f'File: {filename}' in str(out)

次のステップ

他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。