クイックスタート: クライアント ライブラリの使用

Cloud Pub/Sub サービスを使用すると、アプリケーションはメッセージを非同期で、確実かつ迅速に交換できます。これを達成するために、データのプロデューサーは Cloud Pub/Sub トピックにメッセージをパブリッシュします。サブスクライバー クライアントは、そのトピックへのサブスクリプションを作成し、サブスクリプションからのメッセージを使用します。Cloud Pub/Sub は、最大 7 日間、確実に配信できなかったメッセージを維持します。このページでは、クライアント ライブラリを使用して Cloud Pub/Sub でメッセージをパブリッシュする方法について説明します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP Console プロジェクトをセットアップします。

    プロジェクトをセットアップする

    クリックして、以下を行います。

    • プロジェクトを作成または選択します。
    • プロジェクトにCloud Pub/Sub API を有効にします。
    • サービス アカウントを作成します。
    • JSON として秘密鍵をダウンロードします。

    これらのリソースは、GCP Console でいつでも表示および管理できます。

  3. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定してください。

  4. Cloud SDK をインストールして初期化します。

インストール

Cloud Shell ではなく SDK を使用している場合は、選択したプログラミング言語でクライアント ライブラリをインストールします。

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

Maven を使用している場合は、次のものを pom.xml ファイルに追加します。
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.87.0</version>
</dependency>
Gradle を使用している場合は、次のものを依存関係に追加します。
compile 'com.google.cloud:google-cloud-pubsub:1.87.0'
SBT を使用している場合は、次のものを依存関係に追加します。
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.87.0"

IntelliJ または Eclipse を使用している場合は、次の IDE プラグインを使用してプロジェクトにクライアント ライブラリを追加できます。

プラグインでは、サービス アカウントのキー管理などの追加機能も提供されます。詳細は各プラグインのドキュメントをご覧ください。

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Python

Python 開発環境の設定の詳細については、Python 開発環境設定ガイドをご覧ください。

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

Ruby

gem install google-cloud-pubsub

トピックとサブスクリプションを作成する

トピックを作成すると、トピックのサブスクライブやパブリッシュができます。

トピックを作成するには、gcloud pubsub topics create コマンドを使用します。

gcloud pubsub topics create my-topic

サブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを使用します。サブスクライバー アプリケーションで使用できるのは、サブスクリプションの作成後にトピックにパブリッシュしたメッセージだけです。

gcloud pubsub subscriptions create my-sub --topic my-topic

トピックとサブスクリプションの命名の詳細については、リソース名をご覧ください。

メッセージをパブリッシュする

トピックにメッセージをパブリッシュする準備ができました。

C#

// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。 詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PublisherExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  /** Publish messages to a topic.
   * @param args topic name, number of messages
   */
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    int messageCount = Integer.parseInt(args[1]);
    ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // convert message to bytes
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
            .setData(data)
            .build();

        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      // Wait on any pending requests
      List<String> messageIds = ApiFutures.allAsList(futures).get();

      for (String messageId : messageIds) {
        System.out.println(messageId);
      }

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish "This is a test message."

puts "Message published."

メッセージの受信

パブリッシュしたばかりのメッセージを pull するようにサブスクライバーを設定します。すべてのサブスクライバーは構成可能な時間の範囲内で各メッセージに確認応答を返信する必要があります。確認応答がなかったメッセージは再配信されます。Cloud Pub/Sub は、すべてのメッセージがサブスクライバーに対して少なくとも 1 回は送信されるように、メッセージを複数回配信することがあります。メッセージを受信して確認する方法の例を次に示します。

C#

// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

  static class MessageReceiverExample implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      messages.offer(message);
      consumer.ack();
    }
  }

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
        PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
      subscriber.startAsync().awaitRunning();
      // Continue to listen to messages
      while (true) {
        PubsubMessage message = messages.take();
        System.out.println("Message Id: " + message.getMessageId());
        System.out.println("Data: " + message.getData().toStringUtf8());
      }
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

いかがでしたか

クリーンアップ(オプション)

このガイドで使用したリソースの Google Cloud Platform アカウントが課金されないようにするため、トピックとサブスクリプションを削除できます。
  gcloud pubsub subscriptions delete my-sub
  gcloud pubsub topics delete my-topic

次のステップ

構築および実行が可能な例については、Cloud Pub/Sub のチュートリアルをご覧ください。

使用する言語のクライアント ライブラリの詳細については、クライアント ライブラリをご覧ください。

Cloud Pub/Sub の概要については、Cloud Pub/Sub とはをご覧ください。

このページで取り上げたコンセプトの詳細については、パブリッシャー ガイドサブスクライバー ガイドをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Pub/Sub ドキュメント