プッシュ サブスクリプション

push 配信では、Pub/Sub がサブスクライバー アプリケーションに対してリクエストを開始し、メッセージを配信します。

始める前に

このドキュメントを読む前に、次の内容を理解しておいてください。

push サブスクリプションのプロパティ

push サブスクリプションを構成する際には、次のプロパティを指定できます。

  • エンドポイント URL(必須)。 一般公開されている HTTPS アドレス。push エンドポイントのサーバーには、認証局が署名した有効な SSL 証明書が必要です。Pub/Sub サービスでは、Pub/Sub サービスがメッセージを格納するのと同じ Google Cloud リージョンから push エンドポイントにメッセージを配信します。Pub/Sub サービスでは、同じ Google Cloud リージョンからベストエフォート方式でメッセージを配信します。

    Pub/Sub では、push サブスクリプションの URL ドメインの所有権の証明が不要になりました。ドメインが Pub/Sub からの予期しない POST リクエストを受信した場合は、不正行為を報告できます。

  • 認証を有効にする。有効にすると、Pub/Sub によって push エンドポイントに配信されるメッセージに承認ヘッダーが含まれ、エンドポイントでリクエストを認証できるようになります。サブスクリプションと同じプロジェクトでホストされる App Engine Standard エンドポイントと Cloud Functions エンドポイントでは、自動認証と自動承認のメカニズムを使用できます。

認証済み push サブスクリプションの認証構成は、ユーザーが管理するサービス アカウントと、createpatch または ModifyPushConfig 呼び出しで指定されるオーディエンス パラメータで構成されます。また、次のセクションで説明するように、特別な Google が管理するサービス アカウントに特定のロールを付与する必要もあります。

  • ユーザーが管理するサービス アカウント(必須)。push サブスクリプションに関連付けられているサービス アカウント。このアカウントは、生成された JSON ウェブトークン(JWT)の email クレームとして使用されます。 サービス アカウントの要件は次のとおりです。

  • オーディエンス。Webhook が特定のトークンの対象オーディエンスの検証に使用する、大文字と小文字が区別されない単一の文字列。

  • Google が管理するサービス アカウント(必須)。

    • Pub/Sub により、service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 形式のサービス アカウントが自動的に作成されます。

    • このサービス アカウントに、Pub/Sub が認証された push リクエスト用の JWT トークンを作成できるように、iam.serviceAccounts.getOpenIdToken 権限(roles/iam.serviceAccountTokenCreator ロールに含まれる)が付与されている必要があります。

push サブスクリプションと VPC Service Controls

VPC Service Controls で保護されているプロジェクトの場合は、push サブスクリプションに関する次の制限事項に注意してください。

  • push エンドポイントがデフォルトの run.app URL を持つ Cloud Run サービスに設定されている新しい push サブスクリプションのみを作成できます。カスタム ドメインは使用できません。

  • push エンドポイントが Workflows 実行に設定されている Workflows の宛先に Eventarc を介してイベントをルーティングする場合は、Eventarc を介して新しい push サブスクリプションのみを作成できます。

  • 既存の push サブスクリプションを更新することはできません。これらの push サブスクリプションは、VPC Service Controls によって保護されていませんが、引き続き機能します。

  • Cloud Functions は、VPC Service Controls 境界内のエンドポイントを使用して push サブスクリプションを作成できます。

メールの受信

Pub/Sub が push エンドポイントにメッセージを配信すると、Pub/Sub は POST リクエストの本文にメッセージを送信します。リクエストの本文は JSON オブジェクトであり、メッセージ データは message.data フィールドにあります。メッセージ データは base64 でエンコードされています。

push エンドポイントへの POST リクエストの本文の例を次に示します。

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "2070443601311540",
        "message_id": "2070443601311540",
        "publishTime": "2021-02-26T19:13:55.749Z",
        "publish_time": "2021-02-26T19:13:55.749Z"
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

push サブスクリプションからメッセージを受信するには、Webhook を使用して、Pub/Sub によって push エンドポイントに送信される POST リクエストを処理します。App Engine でのこれらの POST リクエストの処理の詳細については、Pub/Sub メッセージの作成とレスポンスをご覧ください。

push リクエストの受信後、HTTP ステータス コードを返します。メッセージへ確認応答するには、次のいずれかのステータス コードを返します。

  • 102
  • 200
  • 201
  • 202
  • 204

メッセージに対し否定応答を送信するには、他のステータス コードを返します。否定応答を送信するか、確認応答期限が切れた場合、Pub/Sub はメッセージを再送信します。push サブスクリプションから受信する個々のメッセージの確認応答期限は、変更できません。

push サブスクリプションの認証

push サブスクリプションが認証を使用する場合、Pub/Sub サービスは JWT に署名し、push リクエストの認可ヘッダーで JWT を送信します。JWT にはクレームと署名が含まれます。

サブスクライバーは JWT を検証し、次のことを確認できます。

  • クレームが正確である。
  • Pub/Sub サービスがクレームに署名した。

サブスクライバーがファイアウォールを使用している場合、push リクエストを受信できません。push リクエストを受信するには、ファイアウォールをオフにして JWT を検証する必要があります。

JWT の形式

JWT は、ヘッダー、クレームセット、署名で構成される OpenIDConnect JWT です。Pub/Sub サービスは、JWT をピリオド区切りの base64 文字列としてエンコードします。

たとえば、次の認証ヘッダーにはエンコードされた JWT が含まれています。

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

ヘッダーとクレームセットは JSON 文字列です。デコードされると次の形式になります。

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

push エンドポイントに送信されるリクエストに添付されるトークンは、最大 1 時間前のものである可能性があります。

Pub/Sub での push 認証の構成

次の例は、push 認証サービス アカウントを任意のサービス アカウントに設定する方法と、Google が管理するサービス アカウント service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.comiam.serviceAccountTokenCreator ロールを付与する方法を示しています。

Console

  1. Pub/Sub サブスクリプション ページに移動します。

    [サブスクリプション] ページに移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] フィールドに名前を入力します。

  4. トピックを選択します。

  5. [配信タイプ] として [push] を選択します。

  6. エンドポイント URL を入力します。

  7. [認証を有効にする] にチェックを入れます。

  8. サービス アカウントを選択します。

  9. プロジェクトの IAM ダッシュボードで、Google が管理するサービス アカウント service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.comiam.serviceAccountTokenCreator ロールがあることを確認します。サービス アカウントにそのロールが付与されていない場合、IAM ダッシュボードで [付与] をクリックして付与します。

  10. 省略可: オーディエンスを入力します。

  11. [作成] をクリックします。

gcloud

# Configure the push subscription
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

# Your Google-managed service account
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

Identity-Aware Proxy で保護されている App Engine アプリケーションで認証済み push サブスクリプションを使用する場合、push 認証トークン オーディエンスとして IAP クライアント ID を指定する必要があります。 App Engine アプリケーションで IAP を有効にするには、IAP の有効化をご覧ください。IAP クライアント ID を確認するには、[認証情報] ページで IAP-App-Engine-app クライアント ID を探します。

請求

JWT を使用して、クレーム(email クレームと aud クレームを含む)が Google によって署名されていることを検証できます。認証と承認の両方で Google の OAuth 2.0 API を使用する方法の詳細については、OpenID Connect をご覧ください。

こうしたクレームを有用にするメカニズムが 2 つあります。まず、Pub/Sub では、CreateSubscription、UpdateSubscription、または ModifyPushConfig 呼び出しを行うユーザーまたはサービス アカウントは、push 認証サービス アカウントに対する iam.serviceAccounts.actAs 権限を持つ必要があります。このようなロールの例は roles/iam.serviceAccountUser です。

次に、トークンの署名に使用する証明書に対するアクセス権を厳密に管理する必要があります。トークンを作成するには、Pub/Sub が個別の署名サービス アカウント ID(Google が管理するサービス アカウント service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com)を使用して、内部 Google サービスを呼び出す必要があります。この署名サービス アカウントは、push 認証サービス アカウント(またはプロジェクトなど、push 認証サービス アカウントの祖先リソース)に対する iam.serviceAccounts.getOpenIdToken 権限、またはサービス アカウント トークン作成者ロール(roles/iam.serviceAccountTokenCreator)を持つ必要があります。

トークンの検証

Pub/Sub によって push エンドポイントに送信されたトークンを検証するには、次のことを行います。

  • 署名の検証を使用したトークンの整合性のチェック。
  • トークンの email クレームと audience クレームが push サブスクリプション構成の設定値と一致することの確認。

次の例は、Identity-Aware Proxy で保護されていない App Engine アプリケーションに対する push リクエストを認証する方法を示しています。App Engine アプリケーションが IAP で保護されている場合、IAP JWT を含む HTTP リクエスト ヘッダーは x-goog-iap-jwt-assertion であるため、それに応じて検証する必要があります。

プロトコル

リクエスト:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

レスポンス:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

        /// <summary>
        /// Extended JWT payload to match the pubsub payload format.
        /// </summary>
        public class PubSubPayload : JsonWebSignature.Payload
        {
            [JsonProperty("email")]
            public string Email { get; set; }
            [JsonProperty("email_verified")]
            public string EmailVerified { get; set; }
        }
        /// <summary>
        /// Handle authenticated push request coming from pubsub.
        /// </summary>
        [HttpPost]
        [Route("/AuthPush")]
        public async Task<IActionResult> AuthPushAsync([FromBody] PushBody body, [FromQuery] string token)
        {
            // Get the Cloud Pub/Sub-generated "Authorization" header.
            string authorizaionHeader = HttpContext.Request.Headers["Authorization"];
            string verificationToken = token ?? body.message.attributes["token"];
            // JWT token comes in `Bearer <JWT>` format substring 7 specifies the position of first JWT char.
            string authToken = authorizaionHeader.StartsWith("Bearer ") ? authorizaionHeader.Substring(7) : null;
            if (verificationToken != _options.VerificationToken || authToken is null)
            {
                return new BadRequestResult();
            }
            // Verify and decode the JWT.
            // Note: For high volume push requests, it would save some network
            // overhead if you verify the tokens offline by decoding them using
            // Google's Public Cert; caching already seen tokens works best when
            // a large volume of messages have prompted a single push server to
            // handle them, in which case they would all share the same token for
            // a limited time window.
            var payload = await JsonWebSignature.VerifySignedTokenAsync<PubSubPayload>(authToken);

            // IMPORTANT: you should validate payload details not covered
            // by signature and audience verification above, including:
            //   - Ensure that `payload.Email` is equal to the expected service
            //     account set up in the push subscription settings.
            //   - Ensure that `payload.Email_verified` is set to true.

            var messageBytes = Convert.FromBase64String(body.message.data);
            string message = System.Text.Encoding.UTF8.GetString(messageBytes);
            s_authenticatedMessages.Add(message);
            return new OkResult();
        }

Go

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
		return
	}

	// IMPORTANT: you should validate claim details not covered by signature
	// and audience verification above, including:
	//   - Ensure that `payload.Claims["email"]` is equal to the expected service
	//     account set up in the push subscription settings.
	//   - Ensure that `payload.Claims["email_verified"]` is set to true.
	if payload.Claims["email"] != "test-service-account-email@example.com" || payload.Claims["email_verified"] != true {
		http.Error(w, "Unexpected email identity", http.StatusBadRequest)
		return
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new GsonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a single push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);

      GoogleIdToken.Payload payload = idToken.getPayload();
      // IMPORTANT: you should validate claim details not covered by signature
      // and audience verification above, including:
      //   - Ensure that `payload.getEmail()` is equal to the expected service
      //     account set up in the push subscription settings.
      //   - Ensure that `payload.getEmailVerified()` is set to true.

      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(payload.toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = JsonParser.parseString(requestBody).getAsJsonObject();
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();

    // IMPORTANT: you should validate claim details not covered
    // by signature and audience verification above, including:
    //   - Ensure that `claim.email` is equal to the expected service
    //     account set up in the push subscription settings.
    //   - Ensure that `claim.email_verified` is set to true.

    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')

        # IMPORTANT: you should validate claim details not covered by signature
        # and audience verification above, including:
        #   - Ensure that `claim["email"]` is equal to the expected service
        #     account set up in the push subscription settings.
        #   - Ensure that `claim["email_verified"]` is set to true.

        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Ruby

post "/pubsub/authenticated-push" do
  halt 400 if params[:token] != PUBSUB_VERIFICATION_TOKEN

  begin
    bearer = request.env["HTTP_AUTHORIZATION"]
    token = /Bearer (.*)/.match(bearer)[1]
    claim = Google::Auth::IDTokens.verify_oidc token, aud: "example.com"

    # IMPORTANT: you should validate claim details not covered by signature
    # and audience verification above, including:
    #   - Ensure that `claim["email"]` is equal to the expected service
    #     account set up in the push subscription settings.
    #   - Ensure that `claim["email_verified"]` is set to true.

    claims.push claim
  rescue Google::Auth::IDTokens::VerificationError => e
    puts "VerificationError: #{e.message}"
    halt 400, "Invalid token"
  end

  message = JSON.parse request.body.read
  payload = Base64.decode64 message["message"]["data"]

  messages.push payload
end

上のコードサンプルで使用されている環境変数 PUBSUB_VERIFICATION_TOKEN については、Pub/Sub メッセージの作成とレスポンスをご覧ください。

署名なし JWT の検証方法の例は、ウェブサイト用の Google ログインガイドで確認できます。OpenID トークンの概要については、JWT の検証に役立つクライアント ライブラリのリストが含まれている OpenID Connect ガイドをご覧ください。

他の Google Cloud サービスからの認証

Cloud Run、App Engine、Cloud Functions は、Pub/Sub で生成されたトークンを確認して、Pub/Sub からの HTTP 呼び出しを認証します。必要な構成は、呼び出し元のアカウントに必要な IAM ロールを付与することのみです。

これらのサービスのさまざまなユースケースについては、以下のガイドやチュートリアルをご覧ください。

Cloud Run:

App Engine:

Cloud Functions:

  • HTTP トリガー: 関数への HTTP トリガーとして Pub/Sub push リクエストを使用する場合、関数を呼び出すには、push 認証サービス アカウントに roles/cloudfunctions.invoker のロールが必要です。
  • Google Cloud Pub/Sub トリガー: Pub/Sub トリガーを使用して関数を呼び出す場合、IAM のロールと権限が自動的に構成されます。

メッセージ配信を管理する

メッセージ配信の停止と再開

Pub/Sub で push エンドポイントに対するリクエストの送信を一時的に停止するには、サブスクリプションを pull に変更します。変更が適用されるまで、数分かかることがあります。

push 配信を再開するには、URL を有効なエンドポイントにもう一度設定します。配信を完全に停止するには、サブスクリプションを削除します。

push バックオフ

push サブスクライバーが送信する否定確認応答が多すぎる場合、Pub/Sub は push バックオフを使用してメッセージ配信を開始する可能性があります。Pub/Sub が push バックオフを使用すると、事前に設定された期間、メッセージの配信を停止します。この期間は 100 ミリ秒から 60 秒の範囲です。この時間が経過すると、Pub/Sub はメッセージの配信を開始します。

プッシュバックのしくみ

push バックオフは、指数バックオフ アルゴリズムを使用して、メッセージの送信間で使用する Pub/Sub の遅延を決定します。この期間は、サブスクライバーが送信する否定応答の数に基づいて計算されます。

たとえば、push サブスクライバーが 1 秒あたり 5 件のメッセージを受信して、1 秒あたり 1 件の否定確認応答を送信する場合は、Pub/Sub は約 500 ミリ秒ごとにメッセージを配信します。push サブスクライバーが毎秒 5 つの否定確認応答を送信する場合、Pub/Sub は 30~60 秒ごとにメッセージを配信します。

push バックオフに関する次の考慮事項に注意してください。

  • push バックオフはオンまたはオフにできません。また、遅延の計算に使用される値は変更できません。
  • push バックオフは、次のアクションに対してトリガーされます。
    • 否定確認応答を受け取った場合。
    • メッセージの確認応答期限が切れた場合。
  • push バックオフは、サブスクリプション(グローバル)内のすべてのメッセージに適用されます。

配信レート

スロースタート アルゴリズムを使用して同時 push リクエストの数を調整します。push ウィンドウは、同時 push リクエストの最大数です。push ウィンドウは、配信が成功すると増加し、失敗すると減少します。システムは、1 桁の小さなウィンドウ サイズで始まります。

サブスクライバーがメッセージを確認すると、ウィンドウが指数関数的に増加します。サブスクライバーがメッセージの 99% 以上を確認応答し、push リクエストのレイテンシが平均 1 秒未満であるサブスクリプションの場合、push ウィンドウはパブリッシュ スループットに対応できるように拡張する必要があります。

push リクエストのレイテンシには、次のものなどがあります。

リージョンあたりの未処理のメッセージが 3,000 件を超えると、ウィンドウは直線的に増加します。これにより、push エンドポイントが過剰なメッセージを受信することを防止します。平均レイテンシが 1 秒を超えるか、サブスクライバーの確認応答がリクエストの 99% 未満の場合、ウィンドウは未処理のメッセージの下限 3,000 件に減少します。

push 配信のモニタリングに使用できる指標の詳細については、push サブスクリプションのモニタリングをご覧ください。

割り当てと上限

push サブスクリプションには割り当てリソース上限があります。