使用推送订阅

Pub/Sub 支持推送消息传送和拉取消息传送。如需大致了解拉取订阅和推送订阅以及两者的对比情况,请参阅订阅者概览。本文档介绍了推送传送。如需了解有关拉取传送的讨论,请参阅拉取订阅者指南

Pub/Sub 订阅可以配置为将所有消息以 HTTP POST 请求的形式发送到 Webhook、推送端点和网址。一般来说,推送端点必须是可公开访问的 HTTPS 服务器,而且提供了由证书授权机构签名并且可以由 DNS 路由的有效 SSL 证书。

此外,推送订阅可以配置为提供身份验证标头,以允许端点对请求进行授权。如果 App Engine 标准应用和 Cloud Functions 端点托管在订阅所属的项目中,则可以使用其他可能更简单的身份验证和授权机制。

接收推送消息

Pub/Sub 推送请求如以下示例所示。请注意 message.data 字段使用 base64 编码

    POST https://www.example.com/my-push-endpoint 

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }
推送端点需要处理传入的消息并返回 HTTP 状态代码来表示处理成功与否。success 响应等同于确认消息。Cloud Pub/Sub 系统解读为消息确认的状态代码包括 200201202204102。成功响应可能会如下所示:

204 No Content

对于推送订阅,Pub/Sub 不会发送否定确认(有时称为 NACK)。如果 Webhook 未返回成功代码,则 Pub/Sub 会重试传送,直到消息在订阅的消息保留期限过后失效为止。您可以为推送订阅配置默认确认时限。不过,与拉取订阅不同的是,您无法为个别消息延长时限。时限实际上是端点对推送请求作出响应的时长。

身份验证和授权

使用 JSON Web 令牌 (JWT)

推送订阅可以配置为将服务帐号身份与推送请求关联,从而使推送端点能够对推送订阅进行身份验证。对推送订阅启用身份验证后,来自该订阅的推送请求会在 Authorization 标头中添加已签名的 OpenIDConnect JWT。推送端点可以使用该令牌来验证该请求是否代表与此订阅相关联的服务帐号发出,并做出授权决策。

OpenIDConnect JWT 是一组以英文句点分隔的采用 base64 编码的字符串,包含三个部分:标头、声明集和签名。以下是授权标头示例:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

标头和声明集是 JSON 字符串。解码后,它们将采用以下格式:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

该令牌的生命周期为一小时。

使用 App Engine 标准应用进行授权

Google 为 App Engine 标准应用端点提供了更简单的授权机制。您可以将 App Engine 标准应用配置为要求调用者以管理员用户身份进行身份验证。如需要求以管理员身份登录到您的端点,请在 app.yaml(如此 Python 2 示例所示)或 <security-constraint>(如此 Java 示例所示)中添加 login: admin 选项。

向 App Engine 端点网址发出的 Pub/Sub 推送请求(路径格式为 /_ah/push-handlers/.*)始终会被授权为来自“管理员”用户的请求。为了保护端点,Pub/Sub 会要求 App Engine 端点网址与订阅所属的项目中的 App Engine 相关联。否则,系统会拒绝设置端点网址的请求。

请注意,login: admin 不适用于 Python 3 应用;您需要在应用代码中实现身份验证逻辑。如需了解详情,请参阅了解访问权限控制

为推送身份验证设置 Pub/Sub

订阅的身份验证配置包含以下两个参数:

  • 服务帐号:与推送订阅关联的 GCP 服务帐号。例如,它可以是角色为 roles/run.invoker 且绑定到特定 Cloud Run(全托管式)服务的服务帐号;使用此服务帐号配置的推送订阅可以调用 Cloud Run(全托管式)服务。
  • 令牌受众群体(可选):一个不区分大小写的字符串,可供 Webhook 用于验证此特定令牌的目标受众群体。

除了配置以上字段之外,您还必须授予 Pub/Sub 为您的服务帐号创建令牌所需的权限。Pub/Sub 会为您的项目创建并保留一个特殊服务帐号:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.。此服务帐号需要 Service Account Token Creator 角色。如果您使用 Cloud Console 为推送身份验证设置订阅,则系统会自动授予该角色。否则,您必须向此帐号明确授予该角色。

请注意,我们将逐步在旧项目中推出此功能,因此在此功能发布后,您的项目可能不会立即存在 Pub/Sub 服务帐号。此功能可立即用于所有新创建的项目。如果您迫切需要在特定项目中启用该功能,请联系 cloud-pubsub@google.com

命令行

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

控制台

  1. 转到 Pub/Sub 主题页面。

    转到“主题”页面

  2. 点击主题名称。

  3. 创建或更新订阅。

  4. 输入身份和(可选)受众群体。

使用推送端点执行身份验证和授权

声明

JWT 可用于验证由 Google 签名的声明(包括 emailaud 声明)。如需详细了解 Google 的 OAuth 2.0 API 如何用于身份验证和授权,请参阅 OpenID Connect

可通过两种机制使声明变得有意义。首先,Pub/Sub 要求,用于将服务帐号身份与推送订阅关联的用户或服务帐号具有项目或服务帐号的 Service Account User 角色。

其次,严格控制对用于为令牌签名的证书的访问。要创建令牌,Pub/Sub 必须使用单独的签名服务帐号身份调用内部 Google 服务。该签名服务帐号必须获得授权才能为已声明的服务帐号或此帐号所属的项目创建令牌。 您可以使用 iam.serviceAccounts.getOpenIdToken 权限或 Service Account Token Creator 角色执行此操作。

此角色或权限可以授予任何帐号。但是,您可以使用 Cloud IAM 服务来确保 Pub/Sub 签名帐号具有此权限。具体来说,Pub/Sub 使用如下所示的服务帐号:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}:订阅所属的 GCP 项目。
  • gcp-sa-pubsub:签名服务帐号所属的 Google 拥有的项目。

验证令牌

以下示例展示了如何对发送到 App Engine 应用的推送请求进行身份验证。

协议

请求:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

响应:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messsages have prompted a singple push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/_ah/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        # Must also verify the `iss` claim.
        if claim['iss'] not in [
            'accounts.google.com',
            'https://accounts.google.com'
        ]:
            raise ValueError('Wrong issuer.')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

您可以在面向网站的 Google 登录指南中找到其他有关如何验证不记名 JWT 的示例。如需详细了解 OpenID 令牌,请参阅 OpenID Connect 指南

Cloud Run

通过验证 Pub/Sub 生成的令牌,Cloud Run 服务会自动对 HTTP 调用进行身份验证。用户所需的唯一配置是向调用者帐号授予必要的 Cloud IAM 角色。例如,您可以授予或撤消用于为帐号调用特定 Cloud Run 端点的权限。如需了解详情,请参阅以下教程:

停止和恢复传送

要暂时阻止 Pub/Sub 将请求发送到推送端点,请将订阅更改为拉取。请注意,此转换可能需要几分钟才能生效。

要恢复推送传送,请再次将网址设置为有效端点。要永久停止传送,请删除订阅

配额、限制和传送速率

请注意,推送订阅受一组配额资源限制的约束。

此外,系统会自动调整推送传送的速率,以最大限度地提高传送速率,同时不会使推送端点过载。可以使用慢启动算法实现这一目的:

  • 系统开始一次发送一条消息。
  • 每次成功传送后,并行发送的消息数量将加倍。
  • 系统传送并行消息的速率将继续加倍,直到传送失败或者系统达到配额或资源限制为止。
  • 每当出现传送失败时,向端点发送的并发请求数量将减半,直到达到一次最少一个请求为止。

此算法假定队列中有足够的已发布消息来维持此吞吐量。最终,推送传送速率会受限于发布消息的速率。