サブスクライバー ガイド

すべてを設定し、Pubsub クライアントをインスタンス化すると、リクエストを API に送信できるようになります。この操作を実行する前に、サブスクリプションでプッシュ配信またはプル配信のどちらを使用するか決定する必要があります。詳細については、サブスクリプションの概要をご覧ください。

サブスクライバー アプリケーションの一般的な流れは次のとおりです。

  1. サブスクリプションを新たに作成し、サブスクライブ先のトピックを指定します。プッシュ配信の場合は、サブスクリプションのプッシュ エンドポイントを設定します。
  2. プル配信を使用している場合は、メッセージを取得します。
  3. 受信メッセージを処理します。

次のコードサンプルは、簡単なリクエストを送信する方法を示しています。

メソッドの全リストについては、リファレンス ドキュメントをご覧ください。

サブスクリプションの概要

  1. プッシュ配信とプル配信のフロー
  2. メッセージの確認応答期限

プッシュ配信とプル配信のフロー

Pub/Sub はプッシュとプルの両方のメッセージ配信をサポートします。プッシュ配信では、Pub/Sub がサブスクライバー アプリケーションへのリクエストを開始して、メッセージを配信します。プル配信では、サブスクライバー アプリケーションが Pub/Sub サーバーへのリクエストを開始して、メッセージを取得します。

プッシュ サブスクリプションでは、Pub/Sub サーバーは、事前設定されたエンドポイントで、サブスクライバー アプリケーションにリクエストを送信します。サブスクライバーの HTTP レスポンスは暗黙の確認応答として機能します。成功のレスポンスは、メッセージが正常に処理されたことを示し、Pub/Sub システムは、このレスポンスをサブスクリプションから削除することができます。非成功のレスポンスは、Pub/Sub サーバーがこれを再送信する必要があることを示しています(暗黙の「nack」)。サブスクライバーがメッセージ フローを処理できるようにするには、Pub/Sub はリクエストのフローを動的に調整し、再試行のレートを制限するアルゴリズムを使用します。

プル サブスクリプションでは、サブスクライブ アプリケーションは、サブスクリプション キュー内のメッセージの配信をリクエストする API pull メソッドを明示的に呼び出します。Pub/Sub サーバーはメッセージ(キューが空の場合はエラー)と確認応答 ID で応答します。次に、サブスクライバーは戻された確認応答 ID を使用して acknowledge メソッドを明示的に呼び出し、受信を確認します。

サブスクリプションの作成時にプッシュ配信またはプル配信を指定できます。この配信はあとからいつでも変更できます(ただし、サブスクリプションが使用できるのは一度に 1 種類の配信だけです)。

プッシュ配信とプル配信の比較

  プル プッシュ
エンドポイント エンドポイントには、ウェブサーバー、モバイル端末、ブラウザなど、どんなものでも指定できます。各エンドポイントは、Pub/Sub API を呼び出せる必要があります。 現時点でサポートされているエンドポイントは、Webhook 配信を受け入れる HTTPS サーバーのみです。受信エンドポイントは、Pub/Sub サブスクライバーから切り離すことができますので、複数のサブスクリプションからのメッセージを単一のエンドポイントに送信できます。
負荷分散 複数のサブスクライバーが、同じ「共有」サブスクリプションへのプル呼び出しを作成できます。サブスクライバーごとにメッセージのサブセットを受信します。 プッシュ エンドポイントにはロードバランサを指定できます。
設定 設定は不要です。 サブスクライバーと同じプロジェクトでは、App Engine アプリへの設定が不要です。
他のすべてのエンドポイントには、Google Cloud Platform Console でプッシュ エンドポイントの設定(および確認)が必要です。エンドポイントには DNS 名から到達可能で、SSL 証明書がインストールされている必要があります。
レイテンシ メッセージのパブリケーションと配信の間で遅延が発生します。 すぐに配信されます(エンドポイントに負担をかけないように配信レートに制限がある場合を除きます)。プルリクエストから追加されるレイテンシはありません。
メッセージ処理とフロー制御 メッセージの確認応答は明示的に行われます。クライアント側にさらに制御がありますが、これはクライアント側により複雑なコードがあることを意味します。メッセージはオンデマンドでのみ配信されます。サブスクライバーは確認応答期限を動的に変更し、メッセージ処理期間を必要に応じて長くすることができます。 Pub/Sub サーバーはフロー制御を自動的に実装します。クライアント側でメッセージ フローを処理する必要はありません(クライアントが HTTP エラーを戻すことで現在のメッセージ ロードを処理できないことを示すことはできます)。
再試行ポリシー 受信が確認されなかったメッセージは、確認応答期限に達すると、配信から最長 7 日間は再度プルできるようになります。 プッシュ エンドポイントからエラーコードが戻ると、メッセージは指数バックオフ ポリシーで最長 7 日間再試行されます。
ネットワーク使用量 / 課金 ポーリングを使用している場合、頻繁に接続を開いてすぐに閉じるとネットワーク使用量が増加することがあります。 ネットワーク トラフィックが少ない場合に、ネットワーク使用効率が高まります。

まとめとして、配信の種類ごとの推奨する使用例を以下に示します。

  推奨する用途
プル
  • サブスクライバ―の数が多く、動的に作成され、手動設定できない
  • SSL 証明書による設定と、プッシュ サブスクリプション用のウェブサーバーを必要とする、Google Compute Engine で実行されるサブスクライバーなどの App Engine 以外のサブスクライバー
プッシュ
  • トラフィックの少ないサブスクライバー
  • オフラインの頻度が高いエフェメラル サブスクライバー
  • リアルタイムに近いパフォーマンスの必要なサブスクライバー

メッセージの確認応答期限

確認応答期限は配信後の秒数で表わされ、サブスクライバーがプル メッセージまたはプッシュ メッセージの受信を確認する必要がある期間です。サブスクライバーが明示的な acknowledge(プル サブスクライバーの場合)または成功のレスポンスコード(プッシュ サブスクライバーの場合)で、この期限内に応答しなかった場合、サーバーはメッセージの再送信を試行します。デフォルトでこの期限は 10 秒ですが、サブスクリプションを作成するときにカスタム値を設定できます。プル サブスクライバーは、指定されたメッセージの処理時間を短縮または延長できるよう、メッセージごとに期限をオンザフライで変更することもできます。次の例は、サブスクリプションの確認応答期限を 60 秒に変更した場合の効果を示しています。

サブスクリプションの作成

サブスクリプションを作成すると、システムは同期点を設定します。つまり、サブスクライバーは、この点の後にパブリッシュされたメッセージをすべて受信することが保証されます。この点の前にパブリッシュされたメッセージは配信されない場合があります。

サブスクリプションを作成するときには以下を実行する必要があります。

  • サブスクライブするトピックを指定します。個々のサブスクリプションは 1 つのトピックのみ指定できます。
  • プッシュ配信メソッドを使用している場合は、pushConfig.pushEndpoint フィールドにプッシュ エンドポイントを指定します。これは HTTPS の URL にする必要があります。
注: プッシュ メッセージングを使用する場合は、以下を実行している必要があります。
  1. Webhook HTTPS コールバック受信側アプリケーション インスタンス用に URL(複数可)を設定します。
  2. Google Cloud Platform Console にプッシュ エンドポイントを設定します(メッセージを受信するサービスの URL(複数可))。Pub/Sub サブスクリプションと同じプロジェクトの App Engine エンドポイントには、この設定が不要です。

以下を指定することもできます。

  • カスタム「確認応答」期限subscription.ackDeadlineSeconds フィールドに設定できます。コードがこの期間内にメッセージに確認応答しなかった場合、メッセージが再送信されます。この期限が指定されていない場合、デフォルト値は 10 秒です。指定できる最大カスタム期限は 600 秒(10 分)です。

次に、サブスクリプションを作成するサンプルコードをいくつか示します。

プロトコル

リクエスト:

PUT https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription

{
 "topic": "projects/someproject/topics/sometopic"
 // Only needed if you are using push delivery
 "pushConfig": {
  "pushEndpoint": "https://myproject.appspot.com/myhandler"
 },
}
  

レスポンス:

200 OK

{
 "name": "projects/myproject/subscriptions/mysubscription",
 "topic": "projects/someproject/topics/sometopic",

 "pushConfig": {
  "pushEndpoint": "https://myproject.appspot.com/myhandler"
 },
 "ackDeadlineSeconds": 10
}

Java

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PushConfig;
import com.google.api.services.pubsub.model.Subscription;

import java.io.IOException;

// Only needed if you are using push delivery
String pushEndpoint = "https://myapp.appspot.com/myhandler";
PushConfig pushConfig = new PushConfig().setPushEndpoint(pushEndpoint);
Pubsub pubsub = createPubsubClient();
Subscription subscription = new Subscription()
        // The name of the topic from which this subscription
        // receives messages
        .setTopic("projects/someproject/topics/sometopic")
        // Ackowledgement deadline in second
        .setAckDeadlineSeconds(10)
        // Only needed if you are using push delivery
        .setPushConfig(pushConfig);
Subscription newSubscription = pubsub.projects().subscriptions().create(
        "projects/myproject/subscriptions/mysubscription", subscription)
        .execute();
System.out.println("Created: " + newSubscription.getName());
   

Python

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

client = create_pubsub_client()

# Only needed if you are using push delivery
push_endpoint = 'https://someproject.appspot.com/myhandler'

# Create a POST body for the Pub/Sub request
body = {
    # The name of the topic from which this subscription receives messages
    'topic': 'projects/someproject/topics/sometopic',
    # Only needed if you are using push delivery
    'pushConfig': {
        'pushEndpoint': push_endpoint
    }
}

subscription = client.projects().subscriptions().create(
    name='projects/myproject/subscriptions/mysubscription',
    body=body).execute()

print 'Created: %s' % subscription.get('name')

Node.js

gcloud-node を使用して Pub/Sub に接続する方法については、gcloud-node ドキュメントをご覧ください。

/**
 * Create a new subscription.
 *
 * @param {string} topicName Name of the topic for the new subscription.
 * @param {string} subscriptionName Name for the new subscription.
 * @param {Function} callback Callback function.
 */
function createSubscription (topicName, subscriptionName, callback) {
  if (!topicName) {
    return callback(new Error('"topicName" is required!'));
  } else if (!subscriptionName) {
    return callback(new Error('"subscriptionName" is required!'));
  }

  var options = {
    reuseExisting: true
  };
  pubsub.subscribe(topicName, subscriptionName, options, function (err, subscription) {
    if (err) {
      return callback(err);
    }

    console.log('Created subscription %s to topic %s', subscriptionName, topicName);
    return callback(null, subscription);
  });
}

Ruby

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

def create_subscription
  gcloud = Gcloud.new "my-gcp-project-id"
  pubsub = gcloud.pubsub
  topic = pubsub.topic "my-topic"

  subscription = topic.subscribe "my-subscription"

  puts "Subscription created #{subscription.name}"
end

プッシュ配信とプル配信の切り替え

サブスクリプションでの配信方法はいつでも変更できます。

  • プッシュ配信からプル配信に切り替えるには、modifyPushConfigRequest を呼び出してプッシュ エンドポイントを空の文字列にリセットします。次に pull を呼び出してメッセージを取得します。
  • プル配信からプッシュ配信に切り替えるには、上述のようにプッシュ エンドポイントを定義し、modifyPushConfig を呼び出します。

メッセージの受信

配信契約

多くの場合、Pub/Sub ではメッセージがパブリッシュされた順序で 1 回配信されます。ただし、この 1 回だけ、順序どおりという配信方法は保証されているわけではありません。メッセージが複数回、順不同で配信される可能性もあります。そのため、サブスクライバーはメッセージを処理するときにべき等である必要があります。また、必要に応じて、順不同で受信したメッセージを処理できるようにしておく必要があります。配信の順序を重視する場合、サブスクライブ先のトピックのパブリッシャーに、メッセージの順序情報を組み込んでおくことを推奨します。メッセージの順序の詳細についてはこのページをご覧ください。受信が確認されなかったメッセージは最長 7 日間、回数の制限なく再試行されます。

メッセージの形式

Pub/Sub は HTTP リクエスト(プッシュの場合)または HTTP レスポンス(プルの場合)の本文でメッセージを返します。本文は以下のような JSON データ構造となります。

プルレスポンスの本文:
{
 "receivedMessages":
 [
  {
   "message": {
    "data": "base64-no-line-feeds-variant-representation-of-payload",
    "attributes": {
     "string-value": "string-value",
     // ... more attributes
    },
    "message_id": "string-value"
   },
   "ack_id": "string-value"
  },
  // ... more messages
 ]
}
プッシュ リクエストの本文:
{
 "message": {
  "attributes": {
   "string-value": "string-value",
   // ... more attributes
  },
  "data": "base64-no-line-feeds-variant-representation-of-payload",
  "message_id": "string-value"
 },
 "subscription": "string-value"
}

プルメッセージの受信

プル配信では、サブスクライバーが次の 2 つのステップを実行する必要があります。
  1. pull メソッドを呼び出し、サブスクリプション キュー内の最新のメッセージを取得します。maxMessages フィールドを設定すると、1 回のコールで戻るメッセージの最大数を指定できます。デフォルトでは、1 つ以上のメッセージを受信するまでサーバーは接続を維持しますが、returnImmediately フィールドを true に設定して、キューが現時点で空の場合、サブスクライバーが受信まで待機することを防ぐこともできます。
  2. acknowledge メソッドを呼び出し、サブスクライバーがメッセージの処理を完了していることがわかると、Pub/Sub サーバーがサブスクリプション キューからそのメッセージを削除することができます。メッセージで確認応答期限を変更して確認応答を遅らせることもできます。

次に、メッセージを一度にプルして確認応答するサンプルコードをいくつか示します。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
 "returnImmediately": "false",
 "maxMessages": "1"
}
  

レスポンス:

200 OK

{
 "receivedMessages": [
  {
   "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
   "message": {
    "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
    "messageId": "19917247034"
   }
  }
 ]
}

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
 "ackIds": [
  "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
 ],
}
   

レスポンス:

   200 OK

Java

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Pubsub pubsub = createPubsubClient();
String subscriptionName =
        "projects/myproject/subscriptions/mysubscription";
// You can fetch multiple messages with a single API call.
int batchSize = 10;
PullRequest pullRequest = new PullRequest()
        // Setting ReturnImmediately to false instructs the API to
        // wait to collect the message up to the size of
        // MaxEvents, or until the timeout.
        .setReturnImmediately(false)
        .setMaxMessages(batchSize);
do {
    PullResponse pullResponse = pubsub.projects().subscriptions()
            .pull(subscriptionName, pullRequest).execute();
    List<String> ackIds = new ArrayList<>(batchSize);
    List<ReceivedMessage> receivedMessages =
            pullResponse.getReceivedMessages();
    if (receivedMessages == null || receivedMessages.isEmpty()) {
        // The result was empty.
        System.out.println("There were no messages.");
        continue;
    }
    for (ReceivedMessage receivedMessage : receivedMessages) {
        PubsubMessage pubsubMessage = receivedMessage.getMessage();
        if (pubsubMessage != null) {
            System.out.print("Message: ");
            System.out.println(
                    new String(pubsubMessage.decodeData(), "UTF-8"));
        }
        ackIds.add(receivedMessage.getAckId());
    }
    // Ack can be done asynchronously if you care about throughput.
    AcknowledgeRequest ackRequest =
            new AcknowledgeRequest().setAckIds(ackIds);
    pubsub.projects().subscriptions()
            .acknowledge(subscriptionName, ackRequest).execute();
    // You can keep pulling messages by changing the condition below.
} while (false);
   

Python

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

import base64

client = create_pubsub_client()

# You can fetch multiple messages with a single API call.
batch_size = 100

subscription = 'projects/myproject/subscriptions/mysubscription'

# Create a POST body for the Pub/Sub request
body = {
    # Setting ReturnImmediately to false instructs the API to wait
    # to collect the message up to the size of MaxEvents, or until
    # the timeout.
    'returnImmediately': False,
    'maxMessages': batch_size,
}
while True:

    resp = client.projects().subscriptions().pull(
        subscription=subscription, body=body).execute()

    received_messages = resp.get('receivedMessages')
    if received_messages is not None:
        ack_ids = []
        for received_message in received_messages:
            pubsub_message = received_message.get('message')
            if pubsub_message:
                # Process messages
                print base64.b64decode(str(pubsub_message.get('data')))
                # Get the message's ack ID
                ack_ids.append(received_message.get('ackId'))

        # Create a POST body for the acknowledge request
        ack_body = {'ackIds': ack_ids}

        # Acknowledge the message.
        client.projects().subscriptions().acknowledge(
            subscription=subscription, body=ack_body).execute()

Node.js

gcloud-node を使用して Pub/Sub に接続する方法については、gcloud-node のドキュメントをご覧ください。

function handleMessage (message) {
  console.log('received message: ' + message.data);
}
/**
 * Pull messages from a topic's subscription.
 *
 * @param {string} subscriptionName The name of the subscription.
 * @param {function} callback The callback function.
 */
function pullMessages (subscriptionName, callback) {
  if (!subscriptionName) {
    return callback(new Error('"subscriptionName" is required!'));
  }

  var subscription = pubsub.subscription(subscriptionName);
  var options = {
    // Limit the amount of messages pulled.
    maxResults: 100,
    // If set, the system will respond immediately. Otherwise, wait until
    // new messages are available. Returns if timeout is reached.
    returnImmediately: false
  };
  // Pull any messages on the subscription
  subscription.pull(options, function (err, messages) {
    if (err) {
      return callback(err);
    }
    // Do something for each message
    messages.forEach(handleMessage);

    console.log('Pulled %d messages!', messages.length);

    // Acknowledge messages
    var subscription = pubsub.subscription(subscriptionName);
    subscription.ack(messages.map(function (message) {
      return message.ackId;
    }), function (err) {
      if (err) {
        return callback(err);
      }

      console.log('Acked %d messages!', messages.length);
      return callback(null, messages);
    });
  });
}

Ruby

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

def pull_messages
  gcloud = Gcloud.new "my-gcp-project-id"
  pubsub = gcloud.pubsub
  subscription = pubsub.subscription "my-subscription"

  puts "Messages pulled:"
  subscription.pull.each do |message|
    puts message.data
    message.acknowledge!
  end
end

プッシュ メッセージの受信

サブスクリプションに対するすべてのメッセージが、Pub/Sub サーバーから設定した Webhook アドレスに自動的に送信されます。Webhook アプリケーションは受信メッセージを処理し、処理の成功を示す HTTP ステータス コードを返す必要があります。次の HTTP ステータス コード(200201204102)のいずれも、Pub/Sub システムで成功と解釈されます。サービスからこれ以外のコードが返った場合、Google Cloud Pub/Sub は指数バックオフ アルゴリズムを使用して無期限に再試行します。その間、他のメッセージの配信は継続されます。Google Cloud Pub/Sub では配信順序が保証されていないことに注意してください。

プッシュ メッセージ配信のレートを制御するには、Google Cloud Pub/Sub はサブスクリプション単位で slow-start アルゴリズムを使用します。Google Cloud Pub/Sub は slow-start によって一度に 1 つのメッセージを送信することから開始し、保留中の同時メッセージの最大数に達するまで、配信が成功するたびに 2 倍のメッセージを送信します。配信が失敗した場合、サブスクリプションで許可される保留中のメッセージの数が半分になります。

具体的なハンドラのコーディングは、プラットフォームまたは環境に依存します。次に、HTTP サーブレット用のサンプルコードをいくつか紹介します。

プロトコル

リクエスト:

POST https://www.example.com/my-push-endpoint

{
 "message": {
  "attributes": {
   "key": "value"
  },
  "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
  "message_id": "136969346945"
 },
 "subscription": "projects/myproject/subscriptions/mysubscription"
}

レスポンス:

      204 No Content

ステータス コードが 204 のレスポンスは、暗黙的確認応答と見なされます。

Java

このサンプルでは、Java 用の Google API クライアント ライブラリを使用します。

import com.google.api.client.json.JsonParser;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.model.PubsubMessage;

import java.io.IOException;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

// The following class is based on the HTTP Servlet 2.5 standard and
// should be registered to listen to your subscription endpoint URL
// and configured in your Web servlet environment (usually through
// your web.xml configuration).
public class ReceivePubsubMessageServlet extends HttpServlet {
    @Override
    public void doPost(HttpServletRequest req, HttpServletResponse resp)
            throws IOException {
        ServletInputStream reader = req.getInputStream();
        // Parse the JSON message to the POJO model class.
        JsonParser parser =
                JacksonFactory.getDefaultInstance().createJsonParser(reader);
        parser.skipToKey("message");
        PubsubMessage message = parser.parseAndClose(PubsubMessage.class);
        // Base64-decode the data and work with it.
        String data = new String(message.decodeData(), "UTF-8");
        // Work with your message
        // Respond with a 20X to acknowledge receipt of the message.
        resp.setStatus(HttpServletResponse.SC_NO_CONTENT);
        resp.getWriter().close();
    }
}

Python

import base64
import json
import urllib

import webapp2

# The following class is based on the webapp2 framework bundled with
# the App Engine SDK. How to extract the POST body may vary
# depending on your framework of choice. Also, this handler should be
# configured to listen to your subscription endpoint URL.
class ReceiveMessage(webapp2.RequestHandler):
    """A handler for push subscription endpoint.."""

    def post(self):
        # Handle the POST body as JSON
        message = json.loads(urllib.unquote(self.request.body).rstrip('='))

        # Decode the actual message
        message_body = base64.b64decode(str(message['message']['data']))

        # Work with your message_body
        # ...

        # Respond with a 20x to acknowledge receipt of the message
        self.response.status = 204

Node.js

gcloud-node を使用して Pub/Sub に接続する方法については、gcloud-node のドキュメントをご覧ください。

'use strict';

var express = require('express');
var bodyParser = require('body-parser');
var gcloud = require('gcloud');

var app = express();
app.set('view engine', 'jade');

var formBodyParser = bodyParser.urlencoded({extended: false});
var jsonBodyParser = bodyParser.json();

// List of all messages received by this instance
var messages = [];

// The following environment variables are set by app.yaml when running on GAE,
// but will need to be manually set when running locally.
var PUBSUB_VERIFICATION_TOKEN = process.env.PUBSUB_VERIFICATION_TOKEN;

var pubsub = gcloud.pubsub({
  projectId: process.env.GCLOUD_PROJECT
});

var topic = pubsub.topic(process.env.PUBSUB_TOPIC);

app.get('/', function (req, res) {
  res.render('index', { messages: messages });
});

app.post('/', formBodyParser, function (req, res, next) {
  if (!req.body.payload) {
    return res.status(400).send('Missing payload');
  }

  topic.publish({
    data: req.body.payload
  }, function (err) {
    if (err) {
      return next(err);
    }
    res.status(200).send('Message sent');
  });
});

app.post('/pubsub/push', jsonBodyParser, function (req, res) {
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    return res.status(400).send();
  }

  // The message is a unicode string encoded in base64.
  var message = new Buffer(req.body.message.data, 'base64').toString('utf-8');

  messages.push(message);

  res.status(200).send();
});

// Start the server
var server = app.listen(process.env.PORT || '8080', function () {
  console.log('App listening on port %s', server.address().port);
  console.log('Press Ctrl+C to quit.');
});

Ruby

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

require "sinatra"
require "json"
require "base64"

post "/push" do
  message = JSON.parse request.body.read
  data = Base64.decode64 message["message"]["data"]
  logger.info "Pushed Message: #{data}"
  response.status = 204
end

プッシュ配信の停止 / 一時停止と再開

サブスクリプションのメッセージの受信を一時停止するには、modifyPushConfigRequest を送信してプッシュ エンドポイントに空の文字列を設定します。メッセージは蓄積されますが配信されません。メッセージの受信を再開するには、プッシュ エンドポイントが指定された別の modifyPushConfigRequest リクエストを送信します。

配信を完全に停止するには、サブスクリプションを削除する必要があります。

プロジェクトのサブスクリプションのリスト表示

デフォルトで、クエリあたり最大で 100 件の結果が返され、pageSize クエリ パラメータに最大で 1000 個の代替値を指定できます。

特定のプロジェクトのすべてのサブスクリプションのリストは、list メソッドを呼び出すことで取得できます。また、特定のトピックのすべてのサブスクリプションのリストは、projects().topics().subscriptions().list メソッドを呼び出すことで取得できます。

次に、サブスクリプションのリストを取得するサンプルコードをいくつか示します。

プロトコル

リクエスト:

GET https://pubsub.googleapis.com/v1/projects/myproject/subscriptions

レスポンス:

200 OK

{
 "subscriptions": [
 {
  "name": "projects/myproject/subscriptions/mysubscription",
  "topic": "projects/myproject/topics/mytopic",

  "pushConfig": {
  "pushEndpoint": "https://myproject.appspot.com/myhandler"
  },
  "ackDeadlineSeconds": 10
 }
 ]
}

Java

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
import com.google.api.services.pubsub.model.Subscription;

import java.io.IOException;
import java.util.List;

Pubsub pubsub = createPubsubClient();
Pubsub.Projects.Subscriptions.List listMethod = pubsub.projects()
        .subscriptions().list("projects/myproject");
String nextPageToken = null;
ListSubscriptionsResponse response;
do {
    if (nextPageToken != null) {
        listMethod.setPageToken(nextPageToken);
    }
    response = listMethod.execute();
    List<Subscription> subscriptions = response.getSubscriptions();
    if (subscriptions != null) {
        for (Subscription subscription : subscriptions) {
            System.out.println(
                    "Found subscription: " + subscription.getName());
        }
    }
    nextPageToken = response.getNextPageToken();
} while (nextPageToken != null);

Python

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

client = create_pubsub_client()

next_page_token = None
while True:
    resp = client.projects().subscriptions().list(
        project='projects/myproject',
        pageToken=next_page_token).execute()

    # Process each subscription
    for subscription in resp['subscriptions']:
        print subscription['name']
    next_page_token = resp.get('nextPageToken')
    if not next_page_token:
        break

Node.js

gcloud-node を使用して Pub/Sub に接続する方法については、gcloud-node のドキュメントをご覧ください。

/**
 * List all subscriptions for the specified topic, or list all subcriptions for
 * all topics.
 *
 * @param {string} [topicName] The name of the topic. If omitted, list all
 * subscriptions.
 * @param {Function} callback The callback function.
 */
function listSubscriptions (topicName, callback) {
  var options = {};
  if (typeof topicName === 'string') {
    // Optionally find subscriptions for a specific topic
    options.topic = pubsub.topic(topicName);
  }
  pubsub.getSubscriptions(options, function (err, subscriptions) {
    if (err) {
      return callback(err);
    }

    console.log('Found %d subscriptions!', subscriptions.length);
    return callback(null, subscriptions);
  });
}

Ruby

Pub/Sub クライアントの作成方法については、アプリケーションの設定をご覧ください。

def list_subscriptions
  gcloud = Gcloud.new "my-gcp-project-id"
  pubsub = gcloud.pubsub
  subscriptions = pubsub.subscriptions

  puts "Subscriptions:"
  subscriptions.each do |subscription|
    puts subscription.name
  end
end

サブスクリプションの削除

次にサブスクリプションを削除するサンプルコードをいくつか紹介します。

Node.js

gcloud-node を使用して Pub/Sub に接続する方法については、gcloud-node のドキュメントをご覧ください。

/**
 * Delete a subscription.
 *
 * @param {string} subscriptionName Name of the subscription to delete.
 * @param {Function} callback The callback function.
 */
function deleteSubscription (subscriptionName, callback) {
  if (!subscriptionName) {
    return callback(new Error('"subscriptionName" is required!'));
  }

  var subscription = pubsub.subscription(subscriptionName);

  // Delete the subscription
  subscription.delete(function (err) {
    if (err) {
      return callback(err);
    }

    console.log('Deleted subscription: %s', subscriptionName);
    return callback(null);
  });
}

外出先でもリソースをモニタリング

Google Cloud Console アプリを入手して、プロジェクトの管理にお役立てください。

フィードバックを送信...