Configure Pub/Sub notifications

This document describes how to set up notifications for updates to notes and occurrences.

Artifact Analysis provides notifications via Pub/Sub for vulnerabilities found by automated scanning and for other metadata. When a note or occurrence is created or updated, a message is published to the corresponding topic for each API version. Use the topic for the API version you are using.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Container Analysis API.

    Enable the API

  4. Install the Google Cloud CLI.
  5. To initialize the gcloud CLI, run the following command:

    gcloud init
  6. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  7. Enable the Container Analysis API.

    Enable the API

  8. Install the Google Cloud CLI.
  9. To initialize the gcloud CLI, run the following command:

    gcloud init
  10. Learn how to set up access control for metadata in your project. Skip this step if you only consume metadata from vulnerability occurrences created by Artifact Analysis container scanning.

Create Pub/Sub topics

After you activate the Artifact Analysis API, Artifact Analysis automatically creates Pub/Sub topics with the following topic IDs:

  • container-analysis-notes-v1
  • container-analysis-occurrences-v1

If the topics were accidentally deleted or are missing, you can add them yourself. For example, the topics might be missing if your Google Cloud organization has an organization policy constraint that requires encryption with customer-managed encryption keys (CMEK). When the Pub/Sub API is in the deny list of this constraint, services cannot automatically create topics with Google-managed encryption keys.

To create the topics with Google-managed encryption keys:

Console

  1. Go to the Pub/Sub topics page in the Google Cloud console.

    Open the Pub/Sub topics page

  2. Click Create Topic.

  3. Enter a Topic ID:

    container-analysis-notes-v1
    

    so that the name matches URI:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    where PROJECT_ID is your Google Cloud project ID.

  4. Click Create.

  5. Enter a Topic ID:

    container-analysis-occurrences-v1
    

    so that the name matches URI:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

Run the following commands in your shell or terminal window:

gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-notes-v1
gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-occurrences-v1

To learn more about the gcloud pubsub topics command, see the topics documentation.

To create the topics with CMEK encryption, see the Pub/Sub instructions for encrypting topics.

Anytime a note or an occurrence is created or updated, a message is published to the respective topic, though you must also create a Pub/Sub subscription to listen for events and receive messages from the Pub/Sub service.

Create Pub/Sub subscriptions

To listen to events, create a Pub/Sub subscription associated with the topic:

Console

  1. Go to the Pub/Sub subscriptions page in the Google Cloud console.

    Open the Pub/Sub subscriptions page

  2. Click Create Subscription.

  3. Type a name for the subscription. For example, notes.

  4. Enter the URI of the topic for notes:

    projects/PROJECT_ID/topics/container-analysis-notes-v1
    

    where PROJECT_ID is your Google Cloud project ID.

  5. Click Create.

  6. Create another subscription for occurrences with the URI:

    projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

gcloud

To receive Pub/Sub events, you must first create a subscription associated with the container-analysis-occurrences-v1 topic:

gcloud pubsub subscriptions create \
    --topic container-analysis-occurrences-v1 occurrences

Going forward, you can pull messages concerning your occurrences using your new subscription:

gcloud pubsub subscriptions pull \
    --auto-ack occurrences

Java

To learn how to install and use the client library for Artifact Analysis, see Artifact Analysis client libraries. For more information, see the Artifact Analysis Java API reference documentation.

To authenticate to Artifact Analysis, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.lang.InterruptedException;
import java.util.concurrent.TimeUnit;

public class Subscriptions {
  // Handle incoming Occurrences using a Cloud Pub/Sub subscription
  public static int pubSub(String subId, long timeoutSeconds, String projectId)
      throws InterruptedException {
    // String subId = "my-occurrence-subscription";
    // long timeoutSeconds = 20;
    // String projectId = "my-project-id";
    Subscriber subscriber = null;
    MessageReceiverExample receiver = new MessageReceiverExample();

    try {
      // Subscribe to the requested Pub/Sub channel
      ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
      subscriber = Subscriber.newBuilder(subName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Sleep to listen for messages
      TimeUnit.SECONDS.sleep(timeoutSeconds);
    } finally {
      // Stop listening to the channel
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
    // Print and return the number of Pub/Sub messages received
    System.out.println(receiver.messageCount);
    return receiver.messageCount;
  }

  // Custom class to handle incoming Pub/Sub messages
  // In this case, the class will simply log and count each message as it comes in
  static class MessageReceiverExample implements MessageReceiver {
    public int messageCount = 0;

    @Override
    public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      // Every time a Pub/Sub message comes in, print it and count it
      System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
      messageCount += 1;
      // Acknowledge the message
      consumer.ack();
    }
  }

  // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
  public static Subscription createOccurrenceSubscription(String subId, String projectId) 
      throws IOException, StatusRuntimeException, InterruptedException {
    // This topic id will automatically receive messages when Occurrences are added or modified
    String topicId = "container-analysis-occurrences-v1";
    TopicName topicName = TopicName.of(projectId, topicId);
    SubscriptionName subName = SubscriptionName.of(projectId, subId);

    SubscriptionAdminClient client = SubscriptionAdminClient.create();
    PushConfig config = PushConfig.getDefaultInstance();
    Subscription sub = client.createSubscription(subName, topicName, config, 0);
    return sub;
  }
}

Go

To learn how to install and use the client library for Artifact Analysis, see Artifact Analysis client libraries. For more information, see the Artifact Analysis Go API reference documentation.

To authenticate to Artifact Analysis, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	pubsub "cloud.google.com/go/pubsub"
)

// occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	// timeout := time.Duration(20) * time.Second
	ctx := context.Background()

	var mu sync.Mutex
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return -1, fmt.Errorf("pubsub.NewClient: %w", err)
	}
	// Subscribe to the requested Pub/Sub channel.
	sub := client.Subscription(subscriptionID)
	count := 0

	// Listen to messages for 'timeout' seconds.
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		count = count + 1
		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
		msg.Ack()
		mu.Unlock()
	})
	if err != nil {
		return -1, fmt.Errorf("sub.Receive: %w", err)
	}
	// Print and return the number of Pub/Sub messages received.
	fmt.Fprintln(w, count)
	return count, nil
}

// createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
func createOccurrenceSubscription(subscriptionID, projectID string) error {
	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// This topic id will automatically receive messages when Occurrences are added or modified
	topicID := "container-analysis-occurrences-v1"
	topic := client.Topic(topicID)
	config := pubsub.SubscriptionConfig{Topic: topic}
	_, err = client.CreateSubscription(ctx, subscriptionID, config)
	return fmt.Errorf("client.CreateSubscription: %w", err)
}

Node.js

To learn how to install and use the client library for Artifact Analysis, see Artifact Analysis client libraries. For more information, see the Artifact Analysis Node.js API reference documentation.

To authenticate to Artifact Analysis, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

/**
 * TODO(developer): Uncomment these variables before running the sample
 */
// const projectId = 'your-project-id', // Your GCP Project ID
// const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
// const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages

// Import the pubsub library and create a client, topic and subscription
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub({projectId});
const subscription = pubsub.subscription(subscriptionId);

// Handle incoming Occurrences using a Cloud Pub/Sub subscription
let count = 0;
const messageHandler = message => {
  count++;
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`Polled ${count} occurrences`);
}, timeoutSeconds * 1000);

Ruby

To learn how to install and use the client library for Artifact Analysis, see Artifact Analysis client libraries. For more information, see the Artifact Analysis Ruby API reference documentation.

To authenticate to Artifact Analysis, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

# subscription_id = "A user-specified identifier for the new subscription"
# timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
# project_id      = "Your Google Cloud project ID"

require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id
topic = pubsub.topic "container-analysis-occurrences-v1"
subscription = topic.subscribe subscription_id

count = 0
subscriber = subscription.listen do |received_message|
  count += 1
  # Process incoming occurrence here
  puts "Message #{count}: #{received_message.data}"
  received_message.acknowledge!
end
subscriber.start
# Wait for incomming occurrences
sleep timeout_seconds
subscriber.stop.wait!
subscription.delete
# Print and return the total number of Pub/Sub messages received
puts "Total Messages Received: #{count}"
count

Python

To learn how to install and use the client library for Artifact Analysis, see Artifact Analysis client libraries. For more information, see the Artifact Analysis Python API reference documentation.

To authenticate to Artifact Analysis, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import time

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message


def pubsub(subscription_id: str, timeout_seconds: int, project_id: str) -> int:
    """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
    # subscription_id := 'my-occurrences-subscription'
    # timeout_seconds = 20
    # project_id = 'my-gcp-project'

    client = SubscriberClient()
    subscription_name = client.subscription_path(project_id, subscription_id)
    receiver = MessageReceiver()
    client.subscribe(subscription_name, receiver.pubsub_callback)

    # listen for 'timeout' seconds
    for _ in range(timeout_seconds):
        time.sleep(1)
    # print and return the number of pubsub messages received
    print(receiver.msg_count)
    return receiver.msg_count


class MessageReceiver:
    """Custom class to handle incoming Pub/Sub messages."""

    def __init__(self) -> None:
        # initialize counter to 0 on initialization
        self.msg_count = 0

    def pubsub_callback(self, message: Message) -> None:
        # every time a pubsub message comes in, print it and count it
        self.msg_count += 1
        print(f"Message {self.msg_count}: {message.data}")
        message.ack()


def create_occurrence_subscription(subscription_id: str, project_id: str) -> bool:
    """Creates a new Pub/Sub subscription object listening to the
    Container Analysis Occurrences topic."""
    # subscription_id := 'my-occurrences-subscription'
    # project_id = 'my-gcp-project'

    topic_id = "container-analysis-occurrences-v1"
    client = SubscriberClient()
    topic_name = f"projects/{project_id}/topics/{topic_id}"
    subscription_name = client.subscription_path(project_id, subscription_id)
    success = True
    try:
        client.create_subscription({"name": subscription_name, "topic": topic_name})
    except AlreadyExists:
        # if subscription already exists, do nothing
        pass
    else:
        success = False
    return success

Subscriber applications only receive messages that are published to the topic after the subscription is created.

Pub/Sub payloads are in JSON and their schema is as follows:

Notes:

{
    "name": "projects/PROJECT_ID/notes/NOTE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

Occurrences:

{
    "name": "projects/PROJECT_ID/occurrences/OCCURRENCE_ID",
    "kind": "NOTE_KIND",
    "notificationTime": "NOTIFICATION_TIME",
}

where:

  • NOTE_KIND is one of the values in NoteKind
  • NOTIFICATION_TIME is a timestamp in RFC 3339 UTC "Zulu" format, accurate to nanoseconds.

View details

To learn more about a note or occurrence, you can access metadata stored in Artifact Analysis. For example, you can request all the details for a specific occurrence. See instructions in Investigating Vulnerabilities.

What's next