Pub/Sub 通知

本文档介绍如何为备注和发生实例更新设置通知。

Container Registry 会在扫描漏洞和其他元数据时通过 Pub/Sub 提供通知。创建或更新备注或发生实例后,该服务会向每个 API 版本的相应主题发布一条消息。请使用与您的当前 API 版本对应的主题。

准备工作

  1. 启用 Container Scanning API

  2. 阅读 Container Analysis 概览

创建 Pub/Sub 主题

激活 Container Analysis API 后,系统将会在您的项目中为您创建以下 Pub/Sub 主题:

  • container-analysis-notes-v1 + container-analysis-occurrences-v1

如果主题缺失或被意外删除,您可以自行添加。

控制台

  1. 转到 Cloud Console 中的 Pub/Sub 主题页面。

    打开 Pub/Sub 主题页面

  2. 点击创建主题

  3. 输入下面的 URI 作为备注的主题:

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
        

    其中 [PROJECT-ID] 是您的 Google Cloud 项目 ID

  4. 点击创建

  5. 输入下面的 URI 为发生实例创建另一个主题:

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
        

gcloud 命令

在 shell 或终端窗口中运行以下命令:

gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-notes-v1
    gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
    

如需详细了解 gcloud pubsub topics 命令,请参阅 topics 文档

无论何时创建或更新备注或事件,该服务都会向相应主题发布一条消息。

Pub/Sub 载荷采用 JSON 格式,其架构如下所示:

备注:

    {
        "name": "projects/[PROJECT_ID]/notes/[NOTE_ID]",
        "kind": "[NOTE_KIND]",
        "notificationTime": "[NOTIFICATION_TIME]",
    }
    

发生实例:

    {
        "name": "projects/[PROJECT_ID]/occurrences/[OCCURRENCE_ID]",
        "kind": "[NOTE_KIND]",
        "notificationTime": "[NOTIFICATION_TIME]",
    }
    

其中:

  • [NOTE_KIND] 是 NoteKind 中的一个值
  • [NOTIFICATION_TIME] 是 RFC 3339 UTC(即“祖鲁时”)格式的时间戳,精确到纳秒。

创建 Pub/Sub 订阅

如需侦听事件,请创建与主题相关联的 Pub/Sub 订阅:

控制台

  1. 转到 Cloud Console 中的 Pub/Sub 订阅页面。

    打开 Pub/Sub 订阅页面

  2. 点击创建订阅

  3. 为该订阅输入一个名称。例如“notes”

  4. 输入下面的 URI 作为备注的主题:

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
        

    其中 [PROJECT-ID] 是您的 Google Cloud 项目 ID

  5. 点击创建

  6. 输入下面的 URI 为发生实例创建另一个订阅:

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
        

gcloud 命令

如需接收 Pub/Sub 事件,您必须首先创建与 container-analysis-occurrences-v1 主题相关联的订阅:

    gcloud pubsub subscriptions create \
        --topic container-analysis-occurrences-v1 occurrences
    

今后,您可以使用新订阅来拉取与您的发生实例相关的消息:

    gcloud pubsub subscriptions pull \
        --auto-ack occurrences
    

Java

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Java API 参考文档

import com.google.cloud.pubsub.v1.AckReplyConsumer;
    import com.google.cloud.pubsub.v1.MessageReceiver;
    import com.google.cloud.pubsub.v1.Subscriber;
    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.ProjectTopicName;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.PushConfig;
    import com.google.pubsub.v1.Subscription;
    import io.grpc.StatusRuntimeException;
    import java.io.IOException;
    import java.lang.InterruptedException;
    import java.util.concurrent.TimeUnit;

    public class Subscriptions {
      // Handle incoming Occurrences using a Cloud Pub/Sub subscription
      public static int pubSub(String subId, long timeoutSeconds, String projectId)
          throws InterruptedException {
        // String subId = "my-occurrence-subscription";
        // long timeoutSeconds = 20;
        // String projectId = "my-project-id";
        Subscriber subscriber = null;
        MessageReceiverExample receiver = new MessageReceiverExample();

        try {
          // Subscribe to the requested Pub/Sub channel
          ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
          subscriber = Subscriber.newBuilder(subName, receiver).build();
          subscriber.startAsync().awaitRunning();
          // Sleep to listen for messages
          TimeUnit.SECONDS.sleep(timeoutSeconds);
        } finally {
          // Stop listening to the channel
          if (subscriber != null) {
            subscriber.stopAsync();
          }
        }
        // Print and return the number of Pub/Sub messages received
        System.out.println(receiver.messageCount);
        return receiver.messageCount;
      }

      // Custom class to handle incoming Pub/Sub messages
      // In this case, the class will simply log and count each message as it comes in
      static class MessageReceiverExample implements MessageReceiver {
        public int messageCount = 0;

        @Override
        public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
          // Every time a Pub/Sub message comes in, print it and count it
          System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
          messageCount += 1;
          // Acknowledge the message
          consumer.ack();
        }
      }

      // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
      public static Subscription createOccurrenceSubscription(String subId, String projectId)
          throws IOException, StatusRuntimeException, InterruptedException {
        // This topic id will automatically receive messages when Occurrences are added or modified
        String topicId = "container-analysis-occurrences-v1";
        ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);

        SubscriptionAdminClient client = SubscriptionAdminClient.create();
        PushConfig config = PushConfig.getDefaultInstance();
        Subscription sub = client.createSubscription(subName, topicName, config, 0);
        return sub;
      }
    }

Go

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Go API 参考文档


    import (
    	"context"
    	"fmt"
    	"io"
    	"sync"
    	"time"

    	pubsub "cloud.google.com/go/pubsub"
    )

    // occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
    func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
    	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
    	// timeout := time.Duration(20) * time.Second
    	ctx := context.Background()

    	var mu sync.Mutex
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return -1, fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	// Subscribe to the requested Pub/Sub channel.
    	sub := client.Subscription(subscriptionID)
    	count := 0

    	// Listen to messages for 'timeout' seconds.
    	ctx, cancel := context.WithTimeout(ctx, timeout)
    	defer cancel()
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		mu.Lock()
    		count = count + 1
    		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
    		msg.Ack()
    		mu.Unlock()
    	})
    	if err != nil {
    		return -1, fmt.Errorf("sub.Receive: %v", err)
    	}
    	// Print and return the number of Pub/Sub messages received.
    	fmt.Fprintln(w, count)
    	return count, nil
    }

    // createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
    func createOccurrenceSubscription(subscriptionID, projectID string) error {
    	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	defer client.Close()

    	// This topic id will automatically receive messages when Occurrences are added or modified
    	topicID := "container-analysis-occurrences-v1"
    	topic := client.Topic(topicID)
    	config := pubsub.SubscriptionConfig{Topic: topic}
    	_, err = client.CreateSubscription(ctx, subscriptionID, config)
    	return fmt.Errorf("client.CreateSubscription: %v", err)
    }
    

Node.js

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Node.js API 参考文档

/**
     * TODO(developer): Uncomment these variables before running the sample
     */
    // const projectId = 'your-project-id', // Your GCP Project ID
    // const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
    // const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

    // Import the pubsub library and create a client, topic and subscription
    const {PubSub} = require('@google-cloud/pubsub');
    const pubsub = new PubSub({projectId});
    const subscription = pubsub.subscription(subscriptionId);

    // Handle incoming Occurrences using a Cloud Pub/Sub subscription
    let count = 0;
    const messageHandler = message => {
      count++;
      message.ack();
    };

    // Listen for new messages until timeout is hit
    subscription.on(`message`, messageHandler);

    setTimeout(() => {
      subscription.removeListener(`message`, messageHandler);
      console.log(`Polled ${count} occurrences`);
    }, timeoutSeconds * 1000);

Ruby

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Ruby API 参考文档

# subscription_id = "A user-specified identifier for the new subscription"
    # timeout_seconds = "The number of seconds to listen for new Pub/Sub
    #                    messages"
    # project_id      = "Your Google Cloud project ID"

    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id
    topic = pubsub.topic "container-analysis-occurrences-v1"
    subscription = topic.subscribe subscription_id

    count = 0
    subscriber = subscription.listen do |received_message|
      count += 1
      # Process incoming occurrence here
      puts "Message #{count}: #{received_message.data}"
      received_message.acknowledge!
    end
    subscriber.start
    # Wait for incomming occurrences
    sleep timeout_seconds
    subscriber.stop.wait!
    subscription.delete
    # Print and return the total number of Pub/Sub messages received
    puts "Total Messages Received: #{count}"
    count

Python

如需了解如何安装和使用 Container Registry 客户端库,请参阅 Container Registry 客户端库。 如需了解详情,请参阅 Container Registry Python API 参考文档

def pubsub(subscription_id, timeout_seconds, project_id):
        """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
        # subscription_id := 'my-occurrences-subscription'
        # timeout_seconds = 20
        # project_id = 'my-gcp-project'

        import time
        from google.cloud.pubsub import SubscriberClient

        client = SubscriberClient()
        subscription_name = client.subscription_path(project_id, subscription_id)
        receiver = MessageReceiver()
        client.subscribe(subscription_name, receiver.pubsub_callback)

        # listen for 'timeout' seconds
        for _ in range(timeout_seconds):
            time.sleep(1)
        # print and return the number of pubsub messages received
        print(receiver.msg_count)
        return receiver.msg_count

    class MessageReceiver:
        """Custom class to handle incoming Pub/Sub messages."""
        def __init__(self):
            # initialize counter to 0 on initialization
            self.msg_count = 0

        def pubsub_callback(self, message):
            # every time a pubsub message comes in, print it and count it
            self.msg_count += 1
            print('Message {}: {}'.format(self.msg_count, message.data))
            message.ack()

    def create_occurrence_subscription(subscription_id, project_id):
        """Creates a new Pub/Sub subscription object listening to the
        Container Analysis Occurrences topic."""
        # subscription_id := 'my-occurrences-subscription'
        # project_id = 'my-gcp-project'

        from google.api_core.exceptions import AlreadyExists
        from google.cloud.pubsub import SubscriberClient

        topic_id = 'container-analysis-occurrences-v1'
        client = SubscriberClient()
        topic_name = client.topic_path(project_id, topic_id)
        subscription_name = client.subscription_path(project_id, subscription_id)
        success = True
        try:
            client.create_subscription(subscription_name, topic_name)
        except AlreadyExists:
            # if subscription already exists, do nothing
            pass
        else:
            success = False
        return success

订阅者应用仅接收在创建订阅后发布到主题的消息。

后续步骤

  • 如需了解如何使用 Container Analysis 存储和管理客户的元数据,请参阅为映像提供元数据

  • 您可以将 Binary Authorization 与漏洞扫描集成在一起,以防止存在已知安全问题的映像在您的部署环境中运行。如需了解如何执行此操作,请参阅漏洞扫描集成