Pruebas de integración de Cloud Storage

Demuestra cómo realizar la prueba de integración de una función activada por Cloud Storage.

Páginas de documentación que incluyen esta muestra de código

Para ver la muestra de código usada en contexto, consulta la siguiente documentación:

Muestra de código

C++

#include <boost/filesystem.hpp>
#include <boost/process.hpp>
#include <curl/curl.h>
#include <gmock/gmock.h>
#include <nlohmann/json.hpp>
#include <chrono>
#include <string>

namespace {

namespace bp = boost::process;
namespace bfs = boost::filesystem;  // Boost.Process cannot use std::filesystem
using ::testing::Contains;
using ::testing::HasSubstr;
using ::testing::IsEmpty;

struct HttpResponse {
  long code;  // NOLINT(google-runtime-int)
  std::string payload;
};

HttpResponse HttpEvent(std::string const& url, std::string const& payload);

char const* argv0 = nullptr;

auto ExePath(bfs::path const& filename) {
  static auto const kPath = std::vector<bfs::path>{
      bfs::canonical(argv0).make_preferred().parent_path()};
  return bp::search_path(filename, kPath);
}

class StorageIntegrationTest : public ::testing::Test {
 protected:
  void SetUp() override {
    curl_global_init(CURL_GLOBAL_ALL);

    auto server =
        bp::child(ExePath("storage_integration_server"), "--port=8050",
                  (bp::std_out & bp::std_err) > child_log_);
    ASSERT_TRUE(WaitForServerReady("http://localhost:8050/"));
    process_ = std::move(server);
  }

  // Wait until an HTTP server starts responding.
  bool WaitForServerReady(std::string const& url);

  void TearDown() override {
    process_.terminate();
    process_.wait();
    curl_global_cleanup();
  }

  std::string NextChildLine() {
    std::string line;
    std::getline(child_log_, line);
    std::cerr << " ... line=" << line << std::endl;
    return line;
  }

 private:
  bp::ipstream child_log_;
  bp::child process_;
};

TEST_F(StorageIntegrationTest, Basic) {
  auto constexpr kOkay = 200;

  auto const base = nlohmann::json::parse(R"js({
    "specversion": "1.0",
    "type": "google.cloud.storage.object.v1.finalized",
    "source": "//storage.googleapis.com/projects/_/buckets/some-bucket",
    "subject": "objects/folder/Test.cs",
    "id": "aaaaaa-1111-bbbb-2222-cccccccccccc",
    "time": "2020-09-29T11:32:00.000Z",
    "datacontenttype": "application/json",
    "data": {
      "kind": "storage#object",
      "bucket": "some-bucket",
      "name": "object-name",
      "generation": "1587627537231057",
      "timeCreated": "2020-04-23T07:38:57.230Z",
      "updated": "2020-04-23T07:38:57.230Z"
    }
  })js");

  struct TestCases {
    std::string name;
    std::string expected;
  } cases[]{
      {"object1.txt", "Object: object1.txt"},
      {"object/with/longer/name.txt", "Object: object/with/longer/name.txt"},
  };

  for (auto const& test : cases) {
    SCOPED_TRACE("Testing for " + test.expected);
    auto object = base;
    object["data"]["name"] = test.name;
    SCOPED_TRACE("event=" + object.dump());
    auto actual = HttpEvent("http://localhost:8050/", object.dump());
    ASSERT_EQ(actual.code, kOkay);
    EXPECT_THAT(actual.payload, IsEmpty());

    std::vector<std::string> lines;
    for (auto line = NextChildLine();
         line.find("Updated: ") == std::string::npos; line = NextChildLine()) {
      lines.push_back(std::move(line));
    }
    EXPECT_THAT(lines, Contains(HasSubstr(test.expected)));
  }
}

extern "C" size_t CurlOnWriteData(char* ptr, size_t size, size_t nmemb,
                                  void* userdata) {
  reinterpret_cast<std::string*>(userdata)->append(ptr, size * nmemb);
  return size * nmemb;
}

HttpResponse HttpEvent(std::string const& url, std::string const& payload) {
  using CurlHandle = std::unique_ptr<CURL, decltype(&curl_easy_cleanup)>;
  using CurlHeaders =
      std::unique_ptr<curl_slist, decltype(&curl_slist_free_all)>;

  auto easy = CurlHandle(curl_easy_init(), curl_easy_cleanup);

  auto setopt = [h = easy.get()](auto opt, auto value) {
    if (auto e = curl_easy_setopt(h, opt, value); e != CURLE_OK) {
      std::ostringstream os;
      os << "error [" << e << "] setting curl_easy option <" << opt
         << ">=" << value;
      throw std::runtime_error(std::move(os).str());
    }
  };
  auto get_response_code = [h = easy.get()]() {
    long code;  // NOLINT(google-runtime-int)
    auto e = curl_easy_getinfo(h, CURLINFO_RESPONSE_CODE, &code);
    if (e != CURLE_OK) std::runtime_error("Cannot get response code");
    return code;
  };

  auto headers = CurlHeaders(nullptr, curl_slist_free_all);
  auto add_header = [&headers](std::string const& h) {
    auto* nh = curl_slist_append(headers.get(), h.c_str());
    (void)headers.release();
    headers.reset(nh);
  };

  setopt(CURLOPT_URL, url.c_str());
  setopt(CURLOPT_POSTFIELDSIZE, payload.size());
  setopt(CURLOPT_POSTFIELDS, payload.data());
  std::string buffer;
  setopt(CURLOPT_WRITEDATA, &buffer);
  setopt(CURLOPT_WRITEFUNCTION, &CurlOnWriteData);
  add_header("Content-Type: application/cloudevents+json");
  add_header("Content-Length: " + std::to_string(payload.size()));
  setopt(CURLOPT_HTTPHEADER, headers.get());

  auto e = curl_easy_perform(easy.get());
  if (e != CURLE_OK) {
    std::ostringstream os;
    os << "Error in curl_easy_perform [" << e << "] - "
       << curl_easy_strerror(e);
    throw std::runtime_error(std::move(os).str());
  }
  return HttpResponse{get_response_code(), std::move(buffer)};
}

bool StorageIntegrationTest::WaitForServerReady(std::string const& url) {
  using namespace std::chrono_literals;
  auto constexpr kOkay = 200;
  for (auto delay : {100ms, 200ms, 400ms, 800ms, 1600ms}) {  // NOLINT
    std::cout << "Waiting for server to start [" << delay.count() << "ms]\n";
    std::this_thread::sleep_for(delay);
    auto constexpr kPing = R"js({
      "specversion": "1.0",
      "type": "test-type",
      "source": "test-source",
      "id": "test-id",
      "datacontenttype": "application/json",
      "data": {
        "kind": "storage#object",
        "bucket": "wait-for-server-ready",
        "name": "wait-for-server-ready",
        "generation": "1587627537231057",
        "id": "some-bucket/object-name/1587627537231057",
        "timeCreated": "2020-04-23T07:38:57.230Z",
        "updated": "2020-04-23T07:38:57.230Z"
      }
    })js";
    try {
      auto const r = HttpEvent(url, kPing);
      if (r.code != kOkay) {
        std::cerr << "... [" << r.code << "]" << std::endl;
        continue;
      }
    } catch (std::exception const& ex) {
      // The HttpEvent() function may fail with an exception until the server is
      // ready. Log it to ease troubleshooting in the CI builds.
      std::cerr << "WaitForServerReady[" << delay.count()
                << "ms]: server ping failed with " << ex.what() << std::endl;
      continue;
    }
    for (auto line = NextChildLine(); !line.empty(); line = NextChildLine()) {
      if (line.find("Updated: ") != std::string::npos) return true;
    }
  }
  return false;
}

}  // namespace

int main(int argc, char* argv[]) {
  ::testing::InitGoogleMock(&argc, argv);
  ::argv0 = argv[0];
  return RUN_ALL_TESTS();
}

Java


import static com.google.common.truth.Truth.assertThat;

import com.google.gson.Gson;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedRunnable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleIT {
  // Root URL pointing to the locally hosted function
  // The Functions Framework Maven plugin lets us run a function locally
  private static final String BASE_URL = "http://localhost:8080";

  private static Process emulatorProcess = null;
  private static final HttpClient client = HttpClientBuilder.create().build();
  private static final Gson gson = new Gson();

  @BeforeClass
  public static void setUp() throws IOException {
    // Get the sample's base directory (the one containing a pom.xml file)
    String baseDir = System.getProperty("basedir");

    // Emulate the function locally by running the Functions Framework Maven plugin
    emulatorProcess = new ProcessBuilder()
        .command("mvn", "function:run")
        .directory(new File(baseDir))
        .start();
  }

  @AfterClass
  public static void tearDown() {
    // Terminate the running Functions Framework Maven plugin process (if it's still running)
    if (emulatorProcess.isAlive()) {
      emulatorProcess.destroy();
    }
  }

  @Test
  public void helloGcs_shouldRunWithFunctionsFramework() throws Throwable {
    String functionUrl = BASE_URL + "/helloGcs"; // URL to your locally-running function

    // Initialize constants
    String name = UUID.randomUUID().toString();
    String jsonStr = gson.toJson(Map.of(
        "data", Map.of(
            "name", name, "resourceState", "exists", "metageneration", 1),
        "context", Map.of(
            "eventType", "google.storage.object.finalize")
    ));

    HttpPost postRequest =  new HttpPost(URI.create(functionUrl));
    postRequest.setEntity(new StringEntity(jsonStr));

    // The Functions Framework Maven plugin process takes time to start up
    // Use resilience4j to retry the test HTTP request until the plugin responds
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .maxAttempts(8)
        .retryExceptions(HttpHostConnectException.class)
        .intervalFunction(IntervalFunction.ofExponentialBackoff(200, 2))
        .build());
    Retry retry = registry.retry("my");

    // Perform the request-retry process
    CheckedRunnable retriableFunc = Retry.decorateCheckedRunnable(
        retry, () -> client.execute(postRequest));
    retriableFunc.run();

    // Get Functions Framework plugin process' stdout
    InputStream stdoutStream = emulatorProcess.getErrorStream();
    ByteArrayOutputStream stdoutBytes = new ByteArrayOutputStream();
    stdoutBytes.write(stdoutStream.readNBytes(stdoutStream.available()));

    // Verify desired name value is present
    assertThat(stdoutBytes.toString(StandardCharsets.UTF_8)).contains(
        String.format("File: %s", name));
  }
}

Node.js

const assert = require('assert');
const execPromise = require('child-process-promise').exec;
const path = require('path');
const uuid = require('uuid');

const requestRetry = require('requestretry');
const cwd = path.join(__dirname, '..');

  it('helloGCSGeneric: should print GCS event', async () => {
    const filename = uuid.v4(); // Use a unique filename to avoid conflicts
    const PORT = 9000; // Each running framework instance needs a unique port

    const eventType = 'google.storage.object.finalize';

    const data = {
      data: {
        name: filename,
        resourceState: 'exists',
        metageneration: '1',
      },
      context: {
        eventType: eventType,
      },
    };

    // Run the functions-framework instance to host functions locally
    //   exec's 'timeout' param won't kill children of "shim" /bin/sh process
    //   Workaround: include "& sleep <TIMEOUT>; kill $!" in executed command
    const proc = execPromise(
      `functions-framework --target=helloGCS --signature-type=event --port=${PORT} & sleep 1; kill $!`,
      {shell: true, cwd}
    );

    // Send HTTP request simulating GCS change notification
    // (GCF translates GCS notifications to HTTP requests internally)
    const response = await requestRetry({
      url: `http://localhost:${PORT}/`,
      method: 'POST',
      body: data,
      retryDelay: 200,
      json: true,
    });

    assert.strictEqual(response.statusCode, 204);

    // Wait for functions-framework process to exit
    const {stdout} = await proc;
    assert.ok(stdout.includes(`File: ${filename}`));
    assert.ok(stdout.includes(`Event Type: ${eventType}`));
  });
});

PHP


namespace Google\Cloud\Samples\Functions\HelloworldStorage;

use Google\CloudFunctions\CloudEvent;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\PhpExecutableFinder;

/**
 * Class SampleIntegrationTest.
 *
 * Integration tests for the 'Helloworld Storage' Cloud Function.
 */
class SampleIntegrationTest extends TestCase
{
    /** @var Process */
    private static $process;

    /** @var Client */
    private static $client;

    public function dataProvider()
    {
        return [
            [
                'cloudevent' => [
                    'id' => uniqid(),
                    'source' => 'storage.googleapis.com',
                    'specversion' => '1.0',
                    'type' => 'google.cloud.storage.object.v1.finalized',
                ],
                'data' => [
                    'bucket' => 'some-bucket',
                    'metageneration' => '1',
                    'name' => 'folder/friendly.txt',
                    'timeCreated' => '2020-04-23T07:38:57.230Z',
                    'updated' => '2020-04-23T07:38:57.230Z',
                ],
                'statusCode' => '200',
            ],
        ];
    }

    /**
     * @dataProvider dataProvider
     */
    public function testHelloGCS(array $cloudevent, array $data, string $statusCode): void
    {
        // Prepare the HTTP headers for a CloudEvent.
        $cloudEventHeaders = [];
        foreach ($cloudevent as $key => $value) {
            $cloudEventHeaders['ce-' . $key] = $value;
        }

        // Send an HTTP request using CloudEvent metadata.
        $resp = self::$client->post('/', [
            'body' => json_encode($data),
            'headers' => $cloudEventHeaders + [
                // Instruct the function framework to parse the body as JSON.
                'content-type' => 'application/json'
            ],
        ]);

        // The Cloud Function logs all data to stderr.
        $actual = self::$process->getIncrementalErrorOutput();

        // Confirm the status code.
        $this->assertEquals($statusCode, $resp->getStatusCode());

        // Verify the CloudEvent and data properties are logged by the function.
        foreach ($data as $property => $value) {
            $this->assertStringContainsString($value, $actual);
        }
        $this->assertStringContainsString($cloudevent['id'], $actual);
        $this->assertStringContainsString($cloudevent['type'], $actual);
    }

    /**
     * Start a local PHP server running the Functions Framework.
     *
     * @beforeClass
     */
    public static function startFunctionFramework(): void
    {
        $port = getenv('PORT') ?: '8080';
        $php = (new PhpExecutableFinder())->find();
        $uri = 'localhost:' . $port;

        // https://symfony.com/doc/current/components/process.html#usage
        self::$process = new Process([$php, '-S', $uri, 'vendor/bin/router.php'], null, [
            'FUNCTION_SIGNATURE_TYPE' => 'cloudevent',
            'FUNCTION_TARGET' => 'helloGCS',
        ]);
        self::$process->start();

        // Initialize an HTTP client to drive requests.
        self::$client = new Client(['base_uri' => 'http://' . $uri]);

        // Short delay to ensure PHP server is ready.
        sleep(1);
    }

    /**
     * Stop the local PHP server.
     *
     * @afterClass
     */
    public static function stopFunctionFramework(): void
    {
        if (!self::$process->isRunning()) {
            echo self::$process->getErrorOutput();
            throw new RuntimeException('Function Framework PHP process not running by end of test');
        }
        self::$process->stop(3, SIGTERM);
    }
}

Python

import datetime
import os
import subprocess
import uuid

import requests
from requests.packages.urllib3.util.retry import Retry

def test_print_name():
    filename = str(uuid.uuid4())
    port = 8089  # Each running framework instance needs a unique port

    example_timestamp = datetime.datetime.now().isoformat()
    storage_message = {
        'data': {
            'name': filename,
            'bucket': 'my_bucket',
            'metageneration': '1',
            'timeCreated': example_timestamp,
            'updated': example_timestamp
        }
    }

    process = subprocess.Popen(
      [
        'functions-framework',
        '--target', 'hello_gcs',
        '--signature-type', 'event',
        '--port', str(port)
      ],
      cwd=os.path.dirname(__file__),
      stdout=subprocess.PIPE
    )

    # Send HTTP request simulating Pub/Sub message
    # (GCF translates Pub/Sub messages to HTTP requests internally)
    url = f'http://localhost:{port}/'

    retry_policy = Retry(total=6, backoff_factor=1)
    retry_adapter = requests.adapters.HTTPAdapter(
      max_retries=retry_policy)

    session = requests.Session()
    session.mount(url, retry_adapter)

    response = session.post(url, json=storage_message)

    assert response.status_code == 200

    # Stop the functions framework process
    process.kill()
    process.wait()
    out, err = process.communicate()

    print(out, err, response.content)

    assert f'File: {filename}' in str(out)

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.