クイックスタート: gcloud コマンドライン ツールの使用

Pub/Sub は、アプリケーションやサービスの間でイベントデータを交換するためのメッセージング サービスです。データのプロデューサーは Pub/Sub のトピックにメッセージをパブリッシュします。コンシューマは、そのトピックにサブスクリプションを作成します。その時点から、Pub/Sub は、そのメッセージがすべてのコンシューマに少なくとも 1 回配信されることを保証します。コンシューマはサブスクリプションからメッセージを pull するか、push サブスクリプションの webhook として設定されます。すべてのサブスクライバーは設定可能な時間の範囲内で各メッセージに確認応答を返信する必要があります。確認応答がなかったメッセージは再配信されます。Pub/Sub は地理的にグローバルであり、シャーディングや、需要に応じてスケールするための追加設定の必要がありません。

このページでは、Google Cloud Pub/Sub の基本的なタスクを行う方法を説明します。

始める前に

  1. Google アカウントへのログイン

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Platform Console プロジェクトを選択または作成します。

    [プロジェクト] ページに移動

  3. プロジェクトの課金を有効にします。

    課金の有効化

  4. Pub/Sub API を有効にする。

    Enable the API

    API を有効にすると、認証情報を得る必要はありません。

  5. Google Cloud SDK をインストールし、初期化します

gcloud コマンドライン ツールを使用する

gcloud コマンドライン ツールを使用して Google Cloud Pub/Sub でオペレーションを実行できます。Google Cloud Pub/Sub gcloud コマンドの一覧については、gcloud pubsub リファレンスをご覧ください。

この完結した例は、トピックの作成、トピックへのサブスクリプション、メッセージのパブリッシュと受信に必要な手順を示すもので、bash で実行します。

gcloud init
gcloud components install beta
gcloud beta pubsub topics create myTopic
gcloud beta pubsub subscriptions create --topic myTopic mySubscription
gcloud beta pubsub topics publish myTopic "hello"
gcloud beta pubsub subscriptions pull --auto-ack mySubscription

この例では、pull サブスクリプションを使用しています。Pub/Sub はサブスクライバー ガイドに記載されているように push サブスクリプションもサポートしています。

あらゆる管理作業は API を使用して行うことができ、そのほとんどは gcloud ツールと Cloud Platform Console の UI からも行うことができます。

独自のコードを書く

メッセージのパブリッシュやコンシュームなどのデータ処理のために、下記のようなコードを書くことが必要になることがあります。このセクションでは、メッセージをプログラムによってパブリッシュしたりコンシュームしたりする方法を示します。

トピックへのメッセージのパブリッシュ

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

TopicName topicName = new TopicName(projectId, topicId);
PubsubMessage message = new PubsubMessage
{
    // The data is any arbitrary ByteString. Here, we're using text.
    Data = ByteString.CopyFromUtf8(messageText),
    // The attributes provide metadata in a string-to-string
    // dictionary.
    Attributes =
    {
        { attributesKey, attributesValue }
    }
};
publisher.Publish(topicName, new[] { message });
Console.WriteLine("Topic message created.");

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

TopicName topicName = TopicName.create("my-project-id", "my-topic-id");
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.defaultBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  // schedule publishing one message at a time : messages get automatically batched
  for (String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
    messageIdFutures.add(messageIdFuture);
  }
} finally {
  // wait on any pending publish requests.
  List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

  for (String messageId : messageIds) {
    System.out.println("published with message ID: " + messageId);
  }

  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
  }
}

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

function publishMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Publishes the message, e.g. "Hello, world!" or { amount: 599.00, status: 'pending' }
  return topic.publish(data)
    .then((results) => {
      const messageIds = results[0];

      console.log(`Message ${messageIds[0]} published.`);

      return messageIds;
    });
}

PHP

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def publish_message(topic_name, data):
    """Publishes a message to a Pub/Sub topic with the given data."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)

    # Data must be a bytestring
    data = data.encode('utf-8')

    message_id = topic.publish(data)

    print('Message {} published.'.format(message_id))

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

pubsub = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
topic  = pubsub.topic "my-topic"

topic.publish "A Message"

メッセージの受信(pull の場合)

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
PullResponse response = subscriber.Pull(subscriptionName,
    returnImmediately: true, maxMessages: 10);
subscriber.Acknowledge(subscriptionName,
    response.ReceivedMessages.Select(m => m.AckId));

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver = new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.defaultBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

function pullMessages (subscriptionName) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Pulls messages. Set returnImmediately to false to block until messages are
  // received.
  return subscription.pull()
    .then((results) => {
      const messages = results[0];

      console.log(`Received ${messages.length} messages.`);

      messages.forEach((message) => {
        console.log(`* %d %j %j`, message.id, message.data, message.attributes);
      });

      // Acknowledges received messages. If you do not acknowledge, Pub/Sub will
      // redeliver the message.
      return subscription.ack(messages.map((message) => message.ackId));
    });
}

PHP

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def receive_message(topic_name, subscription_name):
    """Receives a message from a pull subscription."""
    pubsub_client = pubsub.Client()
    topic = pubsub_client.topic(topic_name)
    subscription = topic.subscription(subscription_name)

    # Change return_immediately=False to block until messages are
    # received.
    results = subscription.pull(return_immediately=True)

    print('Received {} messages.'.format(len(results)))

    for ack_id, message in results:
        print('* {}: {}, {}'.format(
            message.message_id, message.data, message.attributes))

    # Acknowledge received messages. If you do not acknowledge, Pub/Sub will
    # redeliver the message.
    if results:
        subscription.acknowledge([ack_id for ack_id, message in results])

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

pubsub       = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
subscription = pubsub.subscription "my-subscription"

puts "Messages pulled:"
subscription.pull.each do |message|
  puts message.data
  message.acknowledge!
end

次のステップ

このページで取り上げた概念の詳細については、パブリッシャー ガイドサブスクライバー ガイドをご覧ください。

Cloud Pub/Sub クライアント ライブラリで選択した言語で Cloud Pub/Sub の使用を開始します。

Google Cloud Pub/Sub の詳細については、Google Cloud Pub/Sub とはをご覧ください。

外出先でもリソースをモニタリング

Google Cloud Console アプリを入手して、プロジェクトの管理にお役立てください。

フィードバックを送信...

Cloud Pub/Sub のドキュメント