Pengujian sistem Pub/Sub

Menunjukkan cara menguji sistem fungsi yang dipicu oleh Pub/Sub. Ini adalah pengujian "menyeluruh" yang mengasumsikan bahwa Anda telah men-deploy Cloud Function untuk diuji. Kode dalam contoh ini memicu Cloud Function dengan memublikasikan pesan Pub/Sub, lalu menunggu untuk mengamati output yang diharapkan di Cloud Logging.

Contoh kode

Go

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


package helloworld

import (
	"context"
	"log"
	"os"
	"os/exec"
	"strings"
	"testing"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/gobuffalo/uuid"
)

func TestHelloPubSubSystem(t *testing.T) {
	ctx := context.Background()

	topicName := os.Getenv("FUNCTIONS_TOPIC")
	projectID := os.Getenv("GCP_PROJECT")

	startTime := time.Now().UTC().Format(time.RFC3339)

	// Create the Pub/Sub client and topic.
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		log.Fatal(err)
	}
	topic := client.Topic(topicName)

	// Publish a message with a random string to verify.
	// We use a random string to make sure the function is logging the correct
	// message for this test invocation.
	u := uuid.Must(uuid.NewV4())
	msg := &pubsub.Message{
		Data: []byte(u.String()),
	}
	topic.Publish(ctx, msg).Get(ctx)

	// Wait for logs to be consistent.
	time.Sleep(20 * time.Second)

	// Check logs after a delay.
	cmd := exec.Command("gcloud", "alpha", "functions", "logs", "read", "HelloPubSub", "--start-time", startTime)
	out, err := cmd.CombinedOutput()
	if err != nil {
		t.Fatalf("exec.Command: %v", err)
	}
	if got := string(out); !strings.Contains(got, u.String()) {
		t.Errorf("HelloPubSub got %q, want to contain %q", got, u.String())
	}
}

Java

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.paging.Page;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExampleSystemIT {

  // TODO<developer>: set these values (as environment variables)
  private static final String PROJECT_ID = System.getenv("GCP_PROJECT");
  private static final String TOPIC_NAME = System.getenv("FUNCTIONS_SYSTEM_TEST_TOPIC");

  // TODO<developer>: set this value (as an environment variable) to HelloPubSub
  private static final String FUNCTION_DEPLOYED_NAME = System.getenv("FUNCTIONS_PUBSUB_FN_NAME");

  private static Logging loggingClient;

  private static Publisher publisher;

  private HelloPubSub sampleUnderTest;

  @BeforeClass
  public static void setUp() throws IOException {
    loggingClient = LoggingOptions.getDefaultInstance().getService();
    publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, TOPIC_NAME)).build();
  }

  private static String getLogEntriesAsString(String startTimestamp) {
    // Construct Stackdriver logging filter
    // See this page for more info: https://cloud.google.com/logging/docs/view/advanced-queries
    String filter = "resource.type=\"cloud_function\""
        + " AND severity=INFO"
        + " AND resource.labels.function_name=" + FUNCTION_DEPLOYED_NAME
        + String.format(" AND timestamp>=\"%s\"", startTimestamp);

    // Get Stackdriver logging entries
    Page<LogEntry> logEntries =
        loggingClient.listLogEntries(
            Logging.EntryListOption.filter(filter),
            Logging.EntryListOption.sortOrder(
                Logging.SortingField.TIMESTAMP, Logging.SortingOrder.DESCENDING)
        );

    // Serialize Stackdriver logging entries + collect them into a single string
    String logsConcat = StreamSupport.stream(logEntries.getValues().spliterator(), false)
        .map((x) -> x.toString())
        .collect(Collectors.joining("%n"));

    return logsConcat;
  }

  @Test
  public void helloPubSub_shouldRunOnGcf() throws Exception {
    String name = UUID.randomUUID().toString();

    // Subtract time to work-around local-GCF clock difference
    Instant startInstant = Instant.now().minus(Duration.ofMinutes(4));
    String startTimestamp = DateTimeFormatter.ISO_INSTANT.format(startInstant);

    // Publish to pub/sub topic
    ByteString byteStr = ByteString.copyFrom(name, StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
    publisher.publish(pubsubApiMessage).get();

    // Keep retrying until the logs contain the desired invocation's log entry
    // (If the invocation failed, the retry process will eventually time out)
    RetryRegistry registry = RetryRegistry.of(RetryConfig.custom()
        .maxAttempts(8)
        .intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2))
        .retryOnResult(s -> !s.toString().contains(name))
        .build());
    Retry retry = registry.retry(name);
    String logEntry = Retry
        .decorateFunction(retry, ExampleSystemIT::getLogEntriesAsString)
        .apply(startTimestamp);

    // Perform final assertion (to make sure we fail on timeout)
    assertThat(logEntry).contains(name);
  }
}

Node.js

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

const childProcess = require('child_process');
const assert = require('assert');
const uuid = require('uuid');
const {PubSub} = require('@google-cloud/pubsub');
const moment = require('moment');
const promiseRetry = require('promise-retry');

const pubsub = new PubSub();
const topicName = process.env.FUNCTIONS_TOPIC;
if (!topicName) throw new Error('"FUNCTIONS_TOPIC" env var must be set.');
if (!process.env.GCF_REGION) {
  throw new Error('"GCF_REGION" env var must be set.');
}
const baseCmd = 'gcloud functions';

describe('system tests', () => {
  it('helloPubSub: should print a name', async () => {
    const name = uuid.v4();

    // Subtract time to work-around local-GCF clock difference
    const startTime = moment().subtract(4, 'minutes').toISOString();

    // Publish to pub/sub topic
    const topic = pubsub.topic(topicName);
    const data = Buffer.from(name);
    await topic.publishMessage({data});

    console.log(`published topic ${topicName}, ${name}`);

    // Wait for logs to become consistent
    await promiseRetry(retry => {
      const logs = childProcess
        .execSync(`${baseCmd} logs read helloPubSub --start-time ${startTime}`)
        .toString();

      try {
        assert.ok(logs.includes(`Hello, ${name}!`));
      } catch (err) {
        console.log('An error occurred, retrying:', err);
        retry(err);
      }
    });
  });

Python

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from datetime import datetime
from os import getenv
import subprocess
import time
import uuid

from google.cloud import pubsub_v1
import pytest

PROJECT = getenv("GCP_PROJECT")
TOPIC = getenv("FUNCTIONS_TOPIC")

assert PROJECT is not None
assert TOPIC is not None

@pytest.fixture(scope="module")
def publisher_client():
    yield pubsub_v1.PublisherClient()

def test_print_name(publisher_client):
    start_time = datetime.utcnow().isoformat()
    topic_path = publisher_client.topic_path(PROJECT, TOPIC)

    # Publish the message
    name = uuid.uuid4()
    data = str(name).encode("utf-8")
    publisher_client.publish(topic_path, data=data).result()

    # Wait for logs to become consistent
    time.sleep(15)

    # Check logs after a delay
    log_process = subprocess.Popen(
        [
            "gcloud",
            "alpha",
            "functions",
            "logs",
            "read",
            "hello_pubsub",
            "--start-time",
            start_time,
        ],
        stdout=subprocess.PIPE,
    )
    logs = str(log_process.communicate()[0])
    assert f"Hello {name}!" in logs

Langkah selanjutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contoh Google Cloud.