推送订阅

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

在推送传送模式下,Pub/Sub 向订阅者应用发起请求来传送消息。

准备工作

在阅读本文档之前,请确保您熟悉以下内容:

推送订阅的属性

配置推送订阅时,您可以指定以下属性。

  • 端点网址(必填)。可公开访问的 HTTPS 地址。推送端点的服务器必须具有由证书授权机构签名的有效 SSL 证书。Pub/Sub 服务会将同一 Google Cloud 地区中的消息传送至 Pub/Sub 服务存储消息所在的推送端点。Pub/Sub 服务会尽最大努力传送来自同一 Google Cloud 地区的消息。

    Pub/Sub 不再要求提供推送订阅网址网域的所有权证明。如果您的网域收到来自 Pub/Sub 的意外 POST 请求,您可以举报疑似滥用行为

  • 启用身份验证。启用后,Pub/Sub 向推送端点传送的消息将包含一个授权标头,以允许该端点对请求进行身份验证。自动身份验证和授权机制适用于与订阅在同一项目中托管的 App Engine 标准环境和 Cloud Functions 端点。

经过身份验证的推送订阅的身份验证配置包含用户管理的服务帐号,以及 createpatchModifyPushConfig 调用中指定的受众群体参数。您还必须向特殊由 Google 管理的服务帐号授予特定角色,如下一部分所述。

  • 用户管理的服务帐号(必需)。与推送订阅关联的服务帐号。此帐号会用作所生成 JSON Web 令牌 (JWT) 的 email 声明。下面列出了服务帐号的要求:

  • 受众群体。网络钩子用于验证此特定令牌的目标受众群体的单个字符串,不区分大小写。

  • Google 管理的服务帐号(必需)。

    • Pub/Sub 会自动为您创建格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服务帐号。

    • 此服务帐号必须被授予 iam.serviceAccounts.getOpenIdToken 权限(包含在 roles/iam.serviceAccountTokenCreator 角色中),以允许 Pub/Sub 为经过身份验证的推送请求创建 JWT 令牌。

推送订阅和 VPC Service Controls

对于受 VPC Service Controls 保护的项目,请注意推送订阅的以下限制:

  • 您只能创建将推送端点设置为具有默认 run.app 网址的 Cloud Run 服务的新推送订阅。自定义网域不起作用。

  • 通过 Eventarc 将事件路由到 Workflows 目标(为其推送工作流设置为 Workflows 执行)时,您只能通过 Eventarc 创建新的推送订阅。

  • 您无法更新现有推送订阅。这些推送订阅将继续运行,但不受 VPC Service Controls 的保护。

接收消息

当 Pub/Sub 将消息传送到推送端点时,Pub/Sub 会在 POST 请求的正文中发送消息。请求的正文是 JSON 对象,消息数据位于 message.data 字段中。消息数据采用 base64 编码。

以下示例展示了向推送端点发出的 POST 请求的正文:

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "2070443601311540",
        "message_id": "2070443601311540",
        "publishTime": "2021-02-26T19:13:55.749Z",
        "publish_time": "2021-02-26T19:13:55.749Z"
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

如需接收来自推送订阅的消息,请使用网络钩子并处理 Pub/Sub 发送到推送端点的 POST 请求。如需详细了解如何在 App Engine 中处理这些 POST 请求,请参阅编写和响应 Pub/Sub 消息

收到推送请求后,返回 HTTP 状态代码。要确认该消息,请返回以下状态代码之一:

  • 102
  • 200
  • 201
  • 202
  • 204

要发送此消息的否定确认,请返回其他任何状态代码。如果您发送否定确认或确认时限到期,Pub/Sub 会重新发送该消息。您无法修改从推送订阅收到的个别消息的确认时限。

推送订阅的身份验证

如果推送订阅使用身份验证,Pub/Sub 服务会对 JWT 进行签名,并在推送请求的授权标头中发送 JWT。JWT 包含声明和签名。

订阅者可以验证 JWT 并验证以下内容:

  • 声明准确无误。
  • Pub/Sub 服务签署了声明。

如果订阅者使用防火墙,则无法接收推送请求。要接收推送请求,您必须关闭防火墙并验证 JWT。

JWT 格式

JWT 是一个 OpenIDConnect JWT,包含标头、声明集和签名三个部分。Pub/Sub 服务将 JWT 编码为具有英文句点分隔符的 base64 字符串。

例如,以下授权标头包含已编码的 JWT:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

标头和声明集是 JSON 字符串。解码后,它们将采用以下格式:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

附加到发送到推送端点的请求的令牌可具有最长达一个小时的生命周期。

为推送身份验证配置 Pub/Sub

以下示例展示了如何将推送身份验证服务帐号设置为您选择的服务帐号,以及如何向 Google 管理的服务帐号 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 授予 iam.serviceAccountTokenCreator 角色。

控制台

  1. 转到 Pub/Sub 订阅页面。

    转到“订阅”页面

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入一个名称。

  4. 选择主题。

  5. 选择推送作为递送类型

  6. 输入端点网址。

  7. 选中 Enable authentication(启用身份验证)。

  8. 选择服务帐号。

  9. 确保 Google 管理的服务帐号 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 在项目的 IAM 信息中心中具有 iam.serviceAccountTokenCreator 角色。如果尚未向服务帐号授予此角色,请在 IAM 信息中心内点击授予以执行此操作。

  10. 可选:输入受众群体。

  11. 点击创建

gcloud

# Configure the push subscription
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

# Your Google-managed service account
# `service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com` needs to have the
# `iam.serviceAccountTokenCreator` role.
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

如果您将经过身份验证的推送订阅与受 Identity-Aware Proxy 保护的 App Engine 应用一起使用,则必须提供 IAP 客户端 ID 作为推送身份验证令牌受众群体。如需在 App Engine 应用上启用 IAP,请参阅启用 IAP。如需查找 IAP 客户端 ID,请在凭据页面上查找 IAP-App-Engine-app 客户端 ID。

声明

JWT 可用于验证由 Google 签名的声明(包括 emailaud 声明)。如需详细了解 Google 的 OAuth 2.0 API 如何用于身份验证和授权,请参阅 OpenID Connect

可通过两种机制使声明变得有意义。首先,Pub/Sub 要求进行 CreateSubscription、UpdateSubscription 或 ModifyPushConfig 调用的用户或服务帐号必须具有对推送身份验证服务帐号具有 iam.serviceAccounts.actAs 权限的角色。例如,roles/iam.serviceAccountUser 角色就属于此类角色。

其次,严格控制对用于为令牌签名的证书的访问。 如需创建令牌,Pub/Sub 必须使用单独的签名服务帐号身份(即 Google 管理的服务帐号 service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com)调用内部 Google 服务。此签名服务帐号必须对推送身份验证服务帐号(或推送身份验证服务帐号的任何祖先资源,例如项目)具有 iam.serviceAccounts.getOpenIdToken 权限或 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。

验证令牌

验证 Pub/Sub 发送到推送端点的令牌涉及:

  • 通过使用签名验证检查令牌完整性。
  • 确保令牌中的 emailaudience 声明与推送订阅配置中设置的值匹配。

以下示例说明如何对不受 Identity-Aware Proxy 保护的 App Engine 应用的推送请求进行身份验证。如果您的 App Engine 应用受 IAP 保护,则包含 IAP JWT 的 HTTP 请求标头为 x-goog-iap-jwt-assertion,必须进行相应的验证

协议

请求:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

响应:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

        /// <summary>
        /// Extended JWT payload to match the pubsub payload format.
        /// </summary>
        public class PubSubPayload : JsonWebSignature.Payload
        {
            [JsonProperty("email")]
            public string Email { get; set; }
            [JsonProperty("email_verified")]
            public string EmailVerified { get; set; }
        }
        /// <summary>
        /// Handle authenticated push request coming from pubsub.
        /// </summary>
        [HttpPost]
        [Route("/AuthPush")]
        public async Task<IActionResult> AuthPushAsync([FromBody] PushBody body, [FromQuery] string token)
        {
            // Get the Cloud Pub/Sub-generated "Authorization" header.
            string authorizaionHeader = HttpContext.Request.Headers["Authorization"];
            string verificationToken = token ?? body.message.attributes["token"];
            // JWT token comes in `Bearer <JWT>` format substring 7 specifies the position of first JWT char.
            string authToken = authorizaionHeader.StartsWith("Bearer ") ? authorizaionHeader.Substring(7) : null;
            if (verificationToken != _options.VerificationToken || authToken is null)
            {
                return new BadRequestResult();
            }
            // Verify and decode the JWT.
            // Note: For high volume push requests, it would save some network
            // overhead if you verify the tokens offline by decoding them using
            // Google's Public Cert; caching already seen tokens works best when
            // a large volume of messages have prompted a single push server to
            // handle them, in which case they would all share the same token for
            // a limited time window.
            var payload = await JsonWebSignature.VerifySignedTokenAsync<PubSubPayload>(authToken);

            // IMPORTANT: you should validate payload details not covered
            // by signature and audience verification above, including:
            //   - Ensure that `payload.Email` is equal to the expected service
            //     account set up in the push subscription settings.
            //   - Ensure that `payload.Email_verified` is set to true.

            var messageBytes = Convert.FromBase64String(body.message.data);
            string message = System.Text.Encoding.UTF8.GetString(messageBytes);
            s_authenticatedMessages.Add(message);
            return new OkResult();
        }

Go

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
		return
	}

	// IMPORTANT: you should validate claim details not covered by signature
	// and audience verification above, including:
	//   - Ensure that `payload.Claims["email"]` is equal to the expected service
	//     account set up in the push subscription settings.
	//   - Ensure that `payload.Claims["email_verified"]` is set to true.
	if payload.Claims["email"] != "test-service-account-email@example.com" || payload.Claims["email_verified"] != true {
		http.Error(w, "Unexpected email identity", http.StatusBadRequest)
		return
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a single push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);

      GoogleIdToken.Payload payload = idToken.getPayload();
      // IMPORTANT: you should validate claim details not covered by signature
      // and audience verification above, including:
      //   - Ensure that `payload.getEmail()` is equal to the expected service
      //     account set up in the push subscription settings.
      //   - Ensure that `payload.getEmailVerified()` is set to true.

      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(payload.toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();

    // IMPORTANT: you should validate claim details not covered
    // by signature and audience verification above, including:
    //   - Ensure that `claim.email` is equal to the expected service
    //     account set up in the push subscription settings.
    //   - Ensure that `claim.email_verified` is set to true.

    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')

        # IMPORTANT: you should validate claim details not covered by signature
        # and audience verification above, including:
        #   - Ensure that `claim["email"]` is equal to the expected service
        #     account set up in the push subscription settings.
        #   - Ensure that `claim["email_verified"]` is set to true.

        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Ruby

post "/pubsub/authenticated-push" do
  halt 400 if params[:token] != PUBSUB_VERIFICATION_TOKEN

  begin
    bearer = request.env["HTTP_AUTHORIZATION"]
    token = /Bearer (.*)/.match(bearer)[1]
    claim = Google::Auth::IDTokens.verify_oidc token, aud: "example.com"

    # IMPORTANT: you should validate claim details not covered by signature
    # and audience verification above, including:
    #   - Ensure that `claim["email"]` is equal to the expected service
    #     account set up in the push subscription settings.
    #   - Ensure that `claim["email_verified"]` is set to true.

    claims.push claim
  rescue Google::Auth::IDTokens::VerificationError => e
    puts "VerificationError: #{e.message}"
    halt 400, "Invalid token"
  end

  message = JSON.parse request.body.read
  payload = Base64.decode64 message["message"]["data"]

  messages.push payload
end

如需了解上述代码示例中使用的环境变量 PUBSUB_VERIFICATION_TOKEN,请参阅编写和响应 Pub/Sub 消息

可在面向网站的 Google 登录指南中找到其他有关如何验证不记名 JWT 的示例。如需详细了解 OpenID 令牌,请参阅 OpenID Connect 指南,包括有助于验证 JWT 的客户端库列表。

从其他 Google Cloud 服务进行身份验证

Cloud Run、App Engine 和 Cloud Functions 通过验证 Pub/Sub 生成的令牌来验证来自 Pub/Sub 的 HTTP 调用。您需要的唯一配置是向调用者帐号授予必要的 IAM 角色。

如需了解这些服务的不同用例,请参阅以下指南和教程:

Cloud Run:

AppEngine:

Cloud Functions:

管理邮件递送

停止和恢复消息传送

要暂时阻止 Pub/Sub 将请求发送到推送端点,请将订阅更改为拉取。更改可能需要几分钟才能生效。

要恢复推送传送,请再次将网址设置为有效端点。要永久停止传送,请删除订阅

推送退避算法

如果推送订阅者发送的负面确认过多,Pub/Sub 可能会开始使用推送退避传送消息。Pub/Sub 使用推送退避时间时,会停止传送消息一段时间。时间跨度必须介于 100 毫秒到 60 秒之间。 此时间过后,Pub/Sub 会再次开始传送消息。

退避算法的工作原理

推送退避使用指数退避算法算法来确定 Pub/Sub 在发送消息之间使用的延迟时间。此时长根据推送订阅者发送的否定确认数计算。

例如,如果推送订阅者每秒接收 5 条消息,并且每秒发送一条否定确认,则 Pub/Sub 大约每 500 毫秒传送一次消息。或者,如果推送订阅者每秒发送五个否定确认,则 Pub/Sub 每 30 到 60 秒传送一次消息。

请注意有关退避的以下注意事项:

  • 无法开启或关闭推送退避。此外,您也无法修改用于计算延迟时间的值。
  • 对以下操作推送退避触发器:
    • 收到否定确认时。
    • 消息的确认截止期限。
  • 推送退避适用于订阅中的所有消息(全局)。

传送速率

Pub/Sub 使用慢启动算法调整并发推送请求的数量。允许的并发推送请求数量上限为推送窗口。推送成功后,所有成功传送都会增加,而失败时间则缩小。系统会从小位数的窗口大小开始。

当订阅者确认消息时,窗口会成倍增加。对于订阅者确认消息数量超过 99% 且推送请求延迟时间平均低于 1 秒的订阅,推送窗口应足够长以跟上任何发布吞吐量的步伐。

推送请求延迟时间包含以下内容:

每个区域有 3000 条未完成的消息后,窗口会线性增加,以防止推送端点接收过多消息。如果平均延迟时间超过 1 秒,或订阅者确认的请求所占的百分比低于 99%,则此窗口将减少到 3000 条未完成消息的下限。

如需详细了解可用于监控推送传送的指标,请参阅监控推送订阅

配额和限制

推送订阅受一组配额资源限制的约束。