Pub/Sub 通知

このドキュメントでは、メモとオカレンスの更新通知を設定する方法について説明します。

Container Registry は、脆弱性や他のメタデータをスキャンするたびに、Pub/Sub 経由で通知を送信します。メモやオカレンスが作成または更新されると、それぞれの API バージョンの対応トピックにメッセージがパブリッシュされます。現在の API バージョンに対応するトピックを使用します。

始める前に

  1. Container Analysis API を有効にする

  2. Container Analysis の概要を読む。

Pub/Sub トピックの作成

Container Analysis API を有効にすると、次の Pub/Sub トピックがプロジェクトに作成されます。

  • container-analysis-notes-v1 + container-analysis-occurrences-v1

誤って削除したトピックや見つからないトピックは自分で追加できます。

Console

  1. Cloud Console の Pub/Sub トピックページに移動します。

    Pub/Sub トピックページを開く

  2. [トピックを作成] をクリックします。

  3. メモのトピックを URI 付きで入力します。

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
    

    ここで [PROJECT-ID] は、Google Cloud プロジェクト ID です。

  4. [作成] をクリックします。

  5. URI を使用して、オカレンスの別のトピックを作成します。

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
    

gcloud コマンド

シェルまたはターミナル ウィンドウで次のコマンドを実行します。

gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1

gcloud pubsub topics コマンドについて詳しくは、topics ドキュメントをご覧ください。

メモまたはオカレンスが作成または更新されるたびに、それぞれのトピックにメッセージがパブリッシュされます。

Pub/Sub ペイロードは JSON 形式で、スキーマは次のとおりです。

注記:

{
    "name": "projects/[PROJECT_ID]/notes/[NOTE_ID]",
    "kind": "[NOTE_KIND]",
    "notificationTime": "[NOTIFICATION_TIME]",
}

オカレンス:

{
    "name": "projects/[PROJECT_ID]/occurrences/[OCCURRENCE_ID]",
    "kind": "[NOTE_KIND]",
    "notificationTime": "[NOTIFICATION_TIME]",
}

ここで

  • [NOTE_KIND] は、NoteKind の値の 1 つです。
  • [NOTIFICATION_TIME] は、RFC 3339 UTC Zulu 形式のタイムスタンプ(精度はナノ秒)です。

Pub/Sub サブスクリプションの作成

イベントをリッスンするには、そのトピックに関連付けられた Pub/Sub サブスクリプションを作成します。

Console

  1. Cloud Console で Pub/Sub サブスクリプション ページに移動します。

    Pub/Sub サブスクリプション ページを開く

  2. [サブスクリプションを作成] をクリックします。

  3. サブスクリプションの名前を入力します(例: notes)。

  4. メモに対応するトピックの URI を入力します。

    projects/[PROJECT-ID]/topics/container-analysis-notes-v1
    

    ここで [PROJECT-ID] は、Google Cloud プロジェクト ID です。

  5. [作成] をクリックします。

  6. URI を使用して、オカレンスの別のサブスクリプションを作成します。

     projects/[PROJECT-ID]/topics/container-analysis-occurrences-v1
    

gcloud コマンド

Pub/Sub イベントを受信するには、container-analysis-occurrences-v1 トピックに関連付けられたサブスクリプションを作成する必要があります。

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

新しいサブスクリプションを使用して、オカレンスに関するメッセージを pull できるようになります。

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

Container Registry 用のクライアント ライブラリをインストールして使用する方法については、Container Registry のクライアント ライブラリをご覧ください。詳細については、Container Registry Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId)
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
    ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

Container Registry 用のクライアント ライブラリをインストールして使用する方法については、Container Registry のクライアント ライブラリをご覧ください。詳細については、Container Registry Go API のリファレンス ドキュメントをご覧ください。


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %v", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %v", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %v", err)
}

Node.js

Container Registry 用のクライアント ライブラリをインストールして使用する方法については、Container Registry のクライアント ライブラリをご覧ください。詳細については、Container Registry Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

Container Registry 用のクライアント ライブラリをインストールして使用する方法については、Container Registry のクライアント ライブラリをご覧ください。詳細については、Container Registry Ruby API のリファレンス ドキュメントをご覧ください。

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id
topic = pubsub.topic "container-analysis-occurrences-v1"
subscription = topic.subscribe subscription_id

count = 0
subscriber = subscription.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end
subscriber.start
# Wait for incomming occurrences
sleep timeout_seconds
subscriber.stop.wait!
subscription.delete
# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

Container Registry 用のクライアント ライブラリをインストールして使用する方法については、Container Registry のクライアント ライブラリをご覧ください。詳細については、Container Registry Python API のリファレンス ドキュメントをご覧ください。

def pubsub(subscription_id, timeout_seconds, project_id):
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    import time
    from google.cloud.pubsub import SubscriberClient

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count

class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""
    def __init__(self):
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message):
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print('Message {}: {}'.format(self.msg_count, message.data))
        message.ack()

def create_occurrence_subscription(subscription_id, project_id):
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    from google.api_core.exceptions import AlreadyExists
    from google.cloud.pubsub import SubscriberClient

    topic_id = 'container-analysis-occurrences-v1'
    client = SubscriberClient()
    topic_name = client.topic_path(project_id, topic_id)
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription(subscription_name, topic_name)
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

サブスクライバー アプリケーションは、サブスクリプションの作成後にトピックにパブリッシュされたメッセージのみを受信します。

次のステップ

  • Container Analysis を使用してお客様のメタデータを格納および管理する方法については、イメージのメタデータの提供をご覧ください。

  • Binary Authorization と脆弱性スキャンを統合することで、既知のセキュリティ問題を伴うイメージがデプロイメント環境で実行されないようにできます。これを行う手順については、脆弱性スキャンの統合をご覧ください。