push サブスクリプションの使用

Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、push 配信について説明します。pull 配信の説明については、pull サブスクライバー ガイドをご覧ください。

Pub/Sub サブスクリプションでは、すべてのメッセージを Webhook(push エンドポイント)の URL に対する HTTP POST リクエストとして送信するように構成できます。通常、push エンドポイントは、一般公開された HTTPS サーバーにします。これは、認証局が署名した有効な SSL 証明書を提示し、DNS によるルーティングが可能なサーバーである必要があります。

さらに、エンドポイントでリクエストを承認する際に認証ヘッダーを使用するように push サブスクリプションを構成できます。サブスクリプションと同じプロジェクトでホストされている App Engine スタンダード環境と Cloud Functions エンドポイントでは、さらに簡単な認証および承認メカニズムを使用できます。

push メッセージの受信

Pub/Sub の push リクエストは以下の例のようになります。message.data フィールドは base64 でエンコードされています。

    POST https://www.example.com/my-push-endpoint 

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }
push エンドポイントは受信メッセージを処理し、処理の成否を示す HTTP ステータス コードを返す必要があります。success レスポンスはメッセージの確認応答と同等です。Cloud Pub/Sub システムでメッセージの確認応答として解釈されるステータス コードは、200201202204102 です。成功した場合のレスポンスは次のようになります。

204 No Content

push サブスクリプションの場合、Pub/Sub は否定応答(nack とも呼ばれる)を送信しません。Webhook によって成功コードが返されない場合、サブスクリプションのメッセージ保持期間が終了し、メッセージの確認応答期限が切れるまで、Pub/Sub は配信を再試行します。push サブスクリプションのデフォルトの確認応答期限は構成可能です。ただし、pull サブスクリプションとは異なり、個々のメッセージの期限は延長できません。事実上、期限はエンドポイントが push リクエストに応答する必要がある時間です。

認証と承認

JSON Web Token(JWT)の使用

サービス アカウント ID と push リクエストを関連付けるように push サブスクリプションを構成することで、push エンドポイントによる認証が可能になります。push サブスクリプションで認証を有効にした場合、そのサブスクリプションからの push リクエストでは、認証ヘッダーに署名付きの OpenIDConnect JWT が含まれます。push エンドポイントではトークンを使用して、サブスクリプションに関連付けられたサービス アカウントに対してリクエストが発行されていることを検証し、承認を決定します。

OpenIDConnect JWT は、3 つの Base64 エンコード文字列(ヘッダー、クレームセット、署名)をピリオドで区切ってセットにしたものです。認証ヘッダーの例を次に示します。

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

ヘッダーとクレームセットは JSON 文字列です。デコードされると次の形式になります。

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

トークンの有効期間は 1 時間です。

App Engine スタンダードの URL の認証

App Engine スタンダードのアプリであるエンドポイントについては、シンプルな認証メカニズムを使用できます。App Engine スタンダードのアプリは、呼び出し元が管理者ユーザーとして認証するように構成できます。エンドポイントへの管理者ログインを必須にするには、login: admin オプションを app.yamlPython 2 の場合)または <security-constraint>Java の場合)に追加します。

/_ah/push-handlers/.* という形式のパスを使用した App Engine エンドポイントの URL に対する Pub/Sub の push リクエストは、常に、「管理者」ユーザーからのリクエストとして承認されます。エンドポイントを保護するには、App Engine エンドポイントの URL がサブスクリプションと同じプロジェクト内の App Engine に関連付けられている必要があります。それ以外の場合、エンドポイント URL の設定リクエストは拒否されます。

ただし、login: admin は Python 3 アプリケーションでは動作しません。アプリケーション コードに認証ロジックを実装する必要があります。詳しくは、アクセス制御についてをご覧ください。

Pub/Sub での push 認証の設定

サブスクリプションの認証構成は、次の 2 つのパラメータで構成されます。

  • サービス アカウント: push サブスクリプションに関連付けられている GCP サービス アカウント。たとえば、役割 roles/run.invoker を持ち、特定の Cloud Run(フルマネージド)サービスにバインドされたサービス アカウントがあります。このサービス アカウントで構成された push サブスクリプションは、Cloud Run(フルマネージド)サービスを呼び出すことができます。
  • トークン オーディエンス(省略可): Webhook で特定のトークンのオーディエンスの検証に使用する、大文字と小文字が区別されない単一の文字列。

これらのフィールドの構成に加えて、サービス アカウント用にトークンを作成する権限を Pub/Sub に付与する必要があります。Pub/Sub では、プロジェクト専用のサービス アカウント service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. が作成、維持されます。このサービス アカウントは、サービス アカウント トークン作成者の役割を必要とします。Cloud Console を使用して push 認証用のサブスクリプションを設定する場合は、この役割が自動的に付与されます。その他の場合は、アカウントに対する役割を明示的に付与する必要があります。

段階的にロールアウトされるこの機能が古いプロジェクトに反映されるまでは時間がかかるため、機能のリリース直後はプロジェクトに Pub/Sub サービス アカウントが存在しない可能性があります。一方、新規に作成したプロジェクトでは、すぐにサービス アカウントを使用できます。この機能を特定のプロジェクトですぐに有効にする必要がある場合は、cloud-pubsub@google.com までお問い合わせください。

コマンドライン

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

Console

  1. Pub/Sub トピックページに移動します。

    [トピック] ページに移動

  2. トピック名をクリックします。

  3. サブスクリプションを作成または更新します。

  4. ID を入力し、任意でオーディエンスを入力します。

push エンドポイントによる認証と承認

クレーム

JWT を使用して、クレーム(email クレームと aud クレームを含む)が Google によって署名されていることを検証できます。認証と承認の両方で Google の OAuth 2.0 API を使用する方法の詳細については、OpenID Connect をご覧ください。

こうしたクレームを有用にするメカニズムが 2 つあります。まず、Pub/Sub では、サービス アカウント ID と push サブスクリプションを関連付けるユーザー アカウントまたはサービス アカウントに、プロジェクトまたはサービス アカウント用のサービス アカウント ユーザーの役割を付与する必要があります。

次に、トークンの署名に使用する証明書に対するアクセス権を厳密に管理する必要があります。トークンを作成するには、Pub/Sub が個別の署名サービス アカウント ID を使用して、Google の内部サービスを呼び出す必要があります。申請されたサービス アカウント、またはアカウントを含むプロジェクト用のトークンを作成するには、署名サービス アカウントを承認する必要があります。それには、iam.serviceAccounts.getOpenIdToken 権限またはサービス アカウント トークン作成者の役割を使用します。

この役割または権限は、どのアカウントにも付与できます。Cloud IAM サービスを使用して、この権限を Pub/Sub 署名アカウントに付与するように指定することもできます。具体的には、Pub/Sub では次のようなサービス アカウントを使用します。

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: サブスクリプションを含む GCP プロジェクト。
  • gcp-sa-pubsub: 署名サービス アカウントを含む、Google 所有のプロジェクト。

トークンの検証

次の例は、App Engine アプリケーションに対する push リクエストを認証する方法を示しています。

プロトコル

リクエスト:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

レスポンス:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messsages have prompted a singple push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/_ah/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        # Must also verify the `iss` claim.
        if claim['iss'] not in [
            'accounts.google.com',
            'https://accounts.google.com'
        ]:
            raise ValueError('Wrong issuer.')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

署名なし JWT の検証方法の例は、ウェブサイト用の Google ログインガイドで確認できます。OpenID トークンの概要については、OpenID Connect ガイドをご覧ください。

Cloud Run

Cloud Run サービスは、Pub/Sub で生成されたトークンを確認して、HTTP 呼び出しを自動的に認証します。ユーザーについて必要な構成は、呼び出し元アカウントに付与される必須の IAM 役割だけです。たとえば、アカウントの特定の Cloud Run エンドポイントを呼び出す権限の承認や取り消しができます。詳細については、次のチュートリアルをご覧ください。

配信の停止と再開

Pub/Sub で push エンドポイントに対するリクエストの送信を一時的に停止するには、サブスクリプションを pull に変更します。この変更が適用されるまで、数分かかることがあります。

push 配信を再開するには、URL を有効なエンドポイントにもう一度設定します。配信を完全に停止するには、サブスクリプションを削除します。

割り当て、上限、配信レート

push サブスクリプションには割り当てリソース上限があります。

さらに、push 配信のレートは、push エンドポイントの負荷が過大にならない範囲で配信レートが最大になるように、自動的に調整されます。これは Slow Start アルゴリズムを使用することで可能になります。

  • 最初は、システムが一度に送信するメッセージは 1 つです。
  • 配信が成功するたびに、システムが同時に送信するメッセージの数は 2 倍になります。
  • システムが同時にメッセージを配信する数は、配信が失敗するか、またはシステムが割り当てまたはリソース上限に達するまで倍増し続けます。
  • 配信が失敗するたびに、エンドポイントへの同時リクエスト数は半減し続け、最終的には一度に 1 つのリクエストという下限に達します。

このアルゴリズムでは、最適なスループットを維持できるだけの十分な数のパブリッシュ済みメッセージがキュー内にあることが前提となります。最終的に、push 配信レートは、メッセージがパブリッシュされるペースによって制約されます。