Pub/Sub メッセージを通知メッセージに変換する方法を説明します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
Go
Security Command Center で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import (
"bytes"
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/securitycenter/apiv1/securitycenterpb"
"github.com/golang/protobuf/jsonpb"
)
func receiveMessages(w io.Writer, projectID string, subscriptionName string) error {
// projectID := "your-project-id"
// subsriptionName := "your-subscription-name"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
sub := client.Subscription(subscriptionName)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
var notificationMessage = new(securitycenterpb.NotificationMessage)
jsonpb.Unmarshal(bytes.NewReader(msg.Data), notificationMessage)
fmt.Fprintln(w, "Got finding: ", notificationMessage.GetFinding())
msg.Ack()
cancel()
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
Java
Security Command Center で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.securitycenter.v1.NotificationMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class NotificationReceiver {
private NotificationReceiver() {
}
public static void receiveNotificationMessages(String projectId, String subscriptionId) {
// String projectId = "{your-project}";
// String subscriptionId = "{your-subscription}";
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
try {
Subscriber subscriber =
Subscriber.newBuilder(subscriptionName, new NotificationMessageReceiver()).build();
subscriber.startAsync().awaitRunning();
// This sets the timeout value of the subscriber to 10s.
subscriber.awaitTerminated(10_000, TimeUnit.MILLISECONDS);
} catch (IllegalStateException | TimeoutException e) {
System.out.println("Subscriber stopped: " + e);
}
}
static class NotificationMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
NotificationMessage.Builder notificationMessageBuilder = NotificationMessage.newBuilder();
try {
String jsonString = message.getData().toStringUtf8();
JsonFormat.parser().merge(jsonString, notificationMessageBuilder);
NotificationMessage notificationMessage = notificationMessageBuilder.build();
System.out.println(
String.format("Config id: %s", notificationMessage.getNotificationConfigName()));
System.out.println(String.format("Finding: %s", notificationMessage.getFinding()));
} catch (InvalidProtocolBufferException e) {
System.out.println("Could not parse message: " + e);
} finally {
consumer.ack();
}
}
}
}
Node.js
Security Command Center で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
const {PubSub} = require('@google-cloud/pubsub');
const {StringDecoder} = require('string_decoder');
// projectId = 'your-project-id'
// subscriptionId = 'your-subscription-id'
const subscriptionName =
'projects/' + projectId + '/subscriptions/' + subscriptionId;
const pubSubClient = new PubSub();
function listenForMessages() {
const subscription = pubSubClient.subscription(subscriptionName);
// message.data is a buffer array of json
// 1. Convert buffer to normal string
// 2. Convert json to NotificationMessage object
const messageHandler = message => {
const jsonString = new StringDecoder('utf-8').write(message.data);
const parsedNotificationMessage = JSON.parse(jsonString);
console.log(parsedNotificationMessage);
console.log(parsedNotificationMessage.finding);
// ACK when done with message
message.ack();
};
subscription.on('message', messageHandler);
// Set timeout to 10 seconds
setTimeout(() => {
subscription.removeListener('message', messageHandler);
}, 10000);
}
listenForMessages();
PHP
Security Command Center で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
use Google\Cloud\PubSub\PubSubClient;
/**
* @param string $projectId Your Cloud Project ID
* @param string $subscriptionId Your subscription ID
*/
function receive_notification(string $projectId, string $subscriptionId): void
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionId);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
Python
Security Command Center で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
# Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python
import concurrent
from google.cloud import pubsub_v1
from google.cloud.securitycenter_v1 import NotificationMessage
# TODO: project_id = "your-project-id"
# TODO: subscription_name = "your-subscription-name"
def callback(message):
# Print the data received for debugging purpose if needed
print(f"Received message: {message.data}")
notification_msg = NotificationMessage.from_json(message.data)
print(
"Notification config name: {}".format(
notification_msg.notification_config_name
)
)
print(f"Finding: {notification_msg.finding}")
# Ack the message to prevent it from being pulled again
message.ack()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...\n")
try:
streaming_pull_future.result(timeout=1) # Block for 1 second
except concurrent.futures.TimeoutError:
streaming_pull_future.cancel()
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。