Pub/Sub 通知

本文档介绍如何为备注和发生实例更新设置通知。

Container Registry 会在扫描漏洞和其他元数据时通过 Pub/Sub 提供通知。创建或更新备注或发生实例后,该服务会向每个 API 版本的相应主题发布一条消息。请使用与您的当前 API 版本对应的主题。

准备工作

  1. 启用 Container Analysis API

  2. 阅读 Container Analysis 概览

创建 Pub/Sub 主题

激活 Container Analysis API 后,系统将会在您的项目中为您创建以下 Pub/Sub 主题:

  • container-analysis-notes-v1 + container-analysis-occurrences-v1

如果主题缺失或被意外删除,您可以自行添加。

控制台

  1. 转到 Cloud Console 中的 Pub/Sub 主题页面。

    打开 Pub/Sub 主题页面

  2. 点击创建主题

  3. 输入下面的 URI 作为备注的主题:

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
    

    其中 [PROJECT-ID] 是您的 Google Cloud 项目 ID

  4. 点击创建

  5. 输入下面的 URI 为发生实例创建另一个主题:

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
    

gcloud 命令

在 shell 或终端窗口中运行以下命令:

gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1

如需详细了解 gcloud pubsub topics 命令,请参阅 topics 文档

无论何时创建或更新备注或事件,该服务都会向相应主题发布一条消息。

Pub/Sub 载荷采用 JSON 格式,其架构如下所示:

备注:

{
    "name": "projects/[PROJECT_ID]/notes/[NOTE_ID]",
    "kind": "[NOTE_KIND]",
    "notificationTime": "[NOTIFICATION_TIME]",
}

发生实例:

{
    "name": "projects/[PROJECT_ID]/occurrences/[OCCURRENCE_ID]",
    "kind": "[NOTE_KIND]",
    "notificationTime": "[NOTIFICATION_TIME]",
}

其中:

  • [NOTE_KIND]NoteKind 中的一个值
  • [NOTIFICATION_TIME] 是采用 RFC 3339 UTC(即“祖鲁时”)格式的时间戳,精确到纳秒。

创建 Pub/Sub 订阅

如需侦听事件,请创建与主题相关联的 Pub/Sub 订阅:

控制台

  1. 转到 Cloud Console 中的 Pub/Sub 订阅页面。

    打开 Pub/Sub 订阅页面

  2. 点击创建订阅

  3. 为该订阅输入一个名称。例如“notes”

  4. 输入下面的 URI 作为备注的主题:

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
    

    其中 [PROJECT-ID] 是您的 Google Cloud 项目 ID

  5. 点击创建

  6. 输入下面的 URI 为发生实例创建另一个订阅:

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
    

gcloud 命令

如需接收 Pub/Sub 事件,您必须首先创建与 container-analysis-occurrences-v1 主题相关联的订阅:

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

今后,您可以使用新订阅来拉取与您的发生实例相关的消息:

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Java API 参考文档

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId)
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
    ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Go API 参考文档


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %v", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %v", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %v", err)
}

Node.js

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Ruby API 参考文档

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id
topic = pubsub.topic "container-analysis-occurrences-v1"
subscription = topic.subscribe subscription_id

count = 0
subscriber = subscription.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end
subscriber.start
# Wait for incomming occurrences
sleep timeout_seconds
subscriber.stop.wait!
subscription.delete
# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Python API 参考文档

def pubsub(subscription_id, timeout_seconds, project_id):
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    import time
    from google.cloud.pubsub import SubscriberClient

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count

class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""
    def __init__(self):
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message):
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print('Message {}: {}'.format(self.msg_count, message.data))
        message.ack()

def create_occurrence_subscription(subscription_id, project_id):
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    from google.api_core.exceptions import AlreadyExists
    from google.cloud.pubsub import SubscriberClient

    topic_id = 'container-analysis-occurrences-v1'
    client = SubscriberClient()
    topic_name = client.topic_path(project_id, topic_id)
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription(subscription_name, topic_name)
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

订阅者应用仅接收在创建订阅后发布到主题的消息。

后续步骤

  • 如需了解如何使用 Container Analysis 存储和管理客户的元数据,请参阅为映像提供元数据

  • 您可以将 Binary Authorization 与漏洞扫描集成在一起,以防止存在已知安全问题的映像在您的部署环境中运行。如需了解如何执行此操作,请参阅漏洞扫描集成