Pub/Sub 通知を構成する

このドキュメントでは、メモオカレンスの更新通知を設定する方法について説明します。

Artifact Analysis は、自動スキャンで見つかった脆弱性とその他のメタデータについて、Pub/Sub 経由で通知を送信します。メモやオカレンスが作成または更新されると、それぞれの API バージョンの対応トピックにメッセージがパブリッシュされます。使用している API バージョンのトピックを使用します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Container Analysis API.

    Enable the API

  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Enable the Container Analysis API.

    Enable the API

  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. プロジェクトのメタデータのアクセス制御を設定する方法を確認する。Artifact Analysis コンテナ スキャンによって作成された脆弱性オカレンスのメタデータのみを使用する場合は、この手順をスキップします。

Pub/Sub トピックを作成する

Artifact Analysis API を有効にすると、Artifact Analysis は次のトピック ID を持つ Pub/Sub トピックを自動的に作成します。

  • container-analysis-notes-v1
  • container-analysis-occurrences-v1

誤って削除したトピックや見つからないトピックは自分で追加できます。たとえば、Google Cloud 組織に、顧客管理の暗号鍵(CMEK)による暗号化が必要な組織のポリシーの制約がある場合、トピックが存在しない可能性があります。Pub/Sub API がこの制約の拒否リストに含まれている場合、サービスは、Google が管理する暗号鍵を使用してトピックを自動的に作成することはできません。

Google が所有し管理する鍵を使用してトピックを作成するには:

Console

  1. Google Cloud コンソールで Pub/Sub トピックページに移動します。

    Pub/Sub トピックページを開く

  2. [トピックを作成] をクリックします。

  3. トピック ID を入力します。

    container-analysis-notes-v1
    

    名前が URI と一致するようにします。

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    ここで PROJECT_ID は、Google Cloud プロジェクト ID です。

  4. [作成] をクリックします。

  5. トピック ID を入力します。

    container-analysis-occurrences-v1
    

    名前が URI と一致するようにします。

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

シェルまたはターミナル ウィンドウで次のコマンドを実行します。

gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-occurrences-v1

gcloud pubsub topics コマンドについて詳しくは、topics ドキュメントをご覧ください。

CMEK 暗号化を使用してトピックを作成するには、Pub/Sub のトピックを暗号化する手順をご覧ください。

メモまたはオカレンスが作成または更新されるたびに、それぞれのトピックにメッセージがパブリッシュされます。ただし、イベントをリッスンして Pub/Sub サービスからメッセージを受信するには、Pub/Sub サブスクリプションも作成する必要があります。

Pub/Sub サブスクリプションの作成

イベントをリッスンするには、そのトピックに関連付けられた Pub/Sub サブスクリプションを作成します。

Console

  1. Google Cloud コンソールで Pub/Sub サブスクリプション ページに移動します。

    Pub/Sub サブスクリプション ページを開く

  2. [サブスクリプションを作成] をクリックします。

  3. サブスクリプションの名前を入力します(例: notes)。

  4. メモに対応するトピックの URI を入力します。

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    ここで PROJECT_ID は、Google Cloud プロジェクト ID です。

  5. [作成] をクリックします。

  6. URI を使用して、オカレンスの別のサブスクリプションを作成します。

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

Pub/Sub イベントを受信するには、container-analysis-occurrences-v1 トピックに関連付けられたサブスクリプションを作成する必要があります。

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

新しいサブスクリプションを使用して、オカレンスに関するメッセージを pull できるようになります。

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

Artifact Analysis のクライアント ライブラリをインストールして使用する方法については、Artifact Analysis のクライアント ライブラリをご覧ください。 詳細については、Artifact Analysis Java API のリファレンス ドキュメントをご覧ください。

Artifact Analysis に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId) 
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    TopicName topicName = TopicName.of(projectId, topicId);
    SubscriptionName subName = SubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

Artifact Analysis のクライアント ライブラリをインストールして使用する方法については、Artifact Analysis のクライアント ライブラリをご覧ください。 詳細については、Artifact Analysis Go API のリファレンス ドキュメントをご覧ください。

Artifact Analysis に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %w", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %w", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %w", err)
}

Node.js

Artifact Analysis のクライアント ライブラリをインストールして使用する方法については、Artifact Analysis のクライアント ライブラリをご覧ください。 詳細については、Artifact Analysis Node.js API のリファレンス ドキュメントをご覧ください。

Artifact Analysis に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

Artifact Analysis のクライアント ライブラリをインストールして使用する方法については、Artifact Analysis のクライアント ライブラリをご覧ください。 詳細については、Artifact Analysis Ruby API のリファレンス ドキュメントをご覧ください。

Artifact Analysis に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id
topic = pubsub.topic "container-analysis-occurrences-v1"
subscription = topic.subscribe subscription_id

count = 0
subscriber = subscription.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end
subscriber.start
# Wait for incomming occurrences
sleep timeout_seconds
subscriber.stop.wait!
subscription.delete
# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

Artifact Analysis のクライアント ライブラリをインストールして使用する方法については、Artifact Analysis のクライアント ライブラリをご覧ください。 詳細については、Artifact Analysis Python API のリファレンス ドキュメントをご覧ください。

Artifact Analysis に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

import time

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message


def pubsub(subscription_id: str, timeout_seconds: int, project_id: str) -> int:
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count


class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""

    def __init__(self) -> None:
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message: Message) -> None:
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print(f"Message {self.msg_count}: {message.data}")
        message.ack()


def create_occurrence_subscription(subscription_id: str, project_id: str) -> bool:
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    topic_id = "container-analysis-occurrences-v1"
    client = SubscriberClient()
    topic_name = f"projects/{project_id}/topics/{topic_id}"
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription({"name": subscription_name, "topic": topic_name})
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

サブスクライバー アプリケーションは、サブスクリプションの作成後にトピックにパブリッシュされたメッセージのみを受信します。

Pub/Sub ペイロードは JSON 形式で、スキーマは次のとおりです。

注記:

{
    "name": "projects/PROJECT_ID/notes/NOTE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

オカレンス:

{
    "name": "projects/PROJECT_ID/occurrences/OCCURRENCE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

ここで

  • NOTE_KIND は、NoteKind の値の 1 つです。
  • NOTIFICATION_TIME は、RFC 3339 UTC Zulu 形式のタイムスタンプ(精度はナノ秒)です。

詳細を表示

メモや発生について詳しくは、Artifact Analysis に保存されているメタデータにアクセスしてください。たとえば、特定の発生に関するすべての詳細をリクエストできます。脆弱性の調査の手順をご覧ください。

次のステップ

  • Artifact Analysis を使用してカスタム メタデータを保存および管理する方法については、カスタムメモとオカレンスを作成するをご覧ください。

  • 脆弱性スキャンで証明書を使用すると、既知のセキュリティ問題を伴うイメージがデプロイ環境で実行されないようにできます。これを行う手順については、Kritis Signer で証明書を作成するをご覧ください。