配置 Pub/Sub 通知

本文档介绍了如何为备注发生实例更新设置通知。

Artifact Analysis 会通过 Pub/Sub 提供有关自动扫描发现的漏洞和其他元数据的通知。创建或更新备注或发生实例后,该服务会向每个 API 版本的相应主题发布一条消息。请使用您所使用的 API 版本对应的主题。

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Container Analysis API.

    Enable the API

  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Enable the Container Analysis API.

    Enable the API

  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. 了解如何为项目中的元数据设置访问权限控制。如果您只使用由 Artifact Analysis 容器扫描创建的漏洞出现情况中的元数据,请跳过此步骤。

创建 Pub/Sub 主题

激活 Artifact Analysis API 后,Artifact Analysis 会自动创建具有以下主题 ID 的 Pub/Sub 主题:

  • container-analysis-notes-v1
  • container-analysis-occurrences-v1

如果主题缺失或被意外删除,您可以自行添加。例如,如果您的 Google Cloud组织存在组织政策限制条件,要求使用客户管理的加密密钥 (CMEK) 进行加密,则可能无法看到这些主题。如果 Pub/Sub API 在此约束条件的拒绝列表中,服务将无法使用的 Google 自有且由 Google 管理 的加密密钥自动创建主题。

如需使用 的 Google 拥有和 Google 管理 的加密密钥创建主题,请执行以下操作:

控制台

  1. 前往 Google Cloud 控制台中的 Pub/Sub 主题页面。

    打开 Pub/Sub 主题页面

  2. 点击创建主题

  3. 输入主题 ID:

    container-analysis-notes-v1
    

    以便名称与 URI 匹配:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    其中 PROJECT_ID 是您的 Google Cloud 项目 ID

  4. 点击创建

  5. 输入主题 ID:

    container-analysis-occurrences-v1
    

    以便名称与 URI 匹配:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

在 shell 或终端窗口中运行以下命令:

gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-occurrences-v1

如需详细了解 gcloud pubsub topics 命令,请参阅 topics 文档

如需使用 CMEK 加密创建主题,请参阅 Pub/Sub 有关加密主题的说明

无论何时创建或更新备注或事件,该服务都会向相应主题发布一条消息,但您还必须创建 Pub/Sub 订阅,才能监听事件并接收来自 Pub/Sub 服务的消息。

创建 Pub/Sub 订阅

如需监听事件,请创建与主题相关联的 Pub/Sub 订阅:

控制台

  1. 前往Google Cloud 控制台中的 Pub/Sub 订阅页面。

    打开 Pub/Sub 订阅页面

  2. 点击创建订阅

  3. 为该订阅输入一个名称。例如“notes”

  4. 输入下面的 URI 作为备注的主题:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    其中 PROJECT_ID 是您的 Google Cloud 项目 ID

  5. 点击创建

  6. 输入下面的 URI 为发生实例创建另一个订阅:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

如需接收 Pub/Sub 事件,您必须首先创建与 container-analysis-occurrences-v1 主题相关联的订阅:

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

今后,您可以使用新订阅来拉取与您的发生实例相关的消息:

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

如需了解如何安装和使用工件分析的客户端库,请参阅 Artifact Analysis 客户端库。 如需了解详情,请参阅 Artifact Analysis Java API 参考文档

如需向 Artifact Analysis 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId) 
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    TopicName topicName = TopicName.of(projectId, topicId);
    SubscriptionName subName = SubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

如需了解如何安装和使用工件分析的客户端库,请参阅 Artifact Analysis 客户端库。 如需了解详情,请参阅 Artifact Analysis Go API 参考文档

如需向 Artifact Analysis 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %w", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %w", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %w", err)
}

Node.js

如需了解如何安装和使用工件分析的客户端库,请参阅 Artifact Analysis 客户端库。 如需了解详情,请参阅 Artifact Analysis Node.js API 参考文档

如需向 Artifact Analysis 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

如需了解如何安装和使用工件分析的客户端库,请参阅 Artifact Analysis 客户端库。 如需了解详情,请参阅 Artifact Analysis Ruby API 参考文档

如需向 Artifact Analysis 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id
topic = pubsub.topic "container-analysis-occurrences-v1"
subscription = topic.subscribe subscription_id

count = 0
subscriber = subscription.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end
subscriber.start
# Wait for incomming occurrences
sleep timeout_seconds
subscriber.stop.wait!
subscription.delete
# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

如需了解如何安装和使用工件分析的客户端库,请参阅 Artifact Analysis 客户端库。 如需了解详情,请参阅 Artifact Analysis Python API 参考文档

如需向 Artifact Analysis 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import time

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message


def pubsub(subscription_id: str, timeout_seconds: int, project_id: str) -> int:
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count


class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""

    def __init__(self) -> None:
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message: Message) -> None:
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print(f"Message {self.msg_count}: {message.data}")
        message.ack()


def create_occurrence_subscription(subscription_id: str, project_id: str) -> bool:
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    topic_id = "container-analysis-occurrences-v1"
    client = SubscriberClient()
    topic_name = f"projects/{project_id}/topics/{topic_id}"
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription({"name": subscription_name, "topic": topic_name})
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

订阅者应用仅接收在创建订阅后发布到主题的消息。

Pub/Sub 载荷采用 JSON 格式,其架构如下所示:

备注:

{
    "name": "projects/PROJECT_ID/notes/NOTE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

发生实例:

{
    "name": "projects/PROJECT_ID/occurrences/OCCURRENCE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

其中:

  • NOTE_KINDNoteKind 中的一个值
  • NOTIFICATION_TIME 是采用 RFC 3339 世界协调时间 (UTC)(即“祖鲁时”)格式的时间戳,精确到纳秒。

查看详情

如需详细了解某条备注或某个出现情况,您可以访问存储在 Artifact Analysis 中的元数据。例如,您可以请求获取特定出现情况的所有详细信息。请参阅调查漏洞中的说明。

后续步骤