使用推送订阅

Cloud Pub/Sub 支持推送消息传送和拉取消息传送。如需大致了解拉取订阅和推送订阅并以及两者的对比情况,请参阅订阅者概览。本文档介绍了推送传送。如需了解有关拉取传送的讨论,请参阅拉取订阅者指南

Cloud Pub/Sub 订阅可以配置为将所有消息以 HTTP POST 请求的形式发送到 Webhook、推送端点和网址。一般来说,推送端点必须是可公开访问的 HTTPS 服务器,而且提供了由证书授权机构签名并且可以由 DNS 路由的有效 SSL 证书。您还必须验证自己是否拥有推送端点网域或网址路径的所有权(或者是否有同等级别的访问权限)。

此外,推送订阅可以配置为提供身份验证标头,以允许端点对请求进行授权。如果 App Engine 标准应用和 Cloud Functions 端点托管在订阅所属的项目中,则可以使用其他可能更简单的身份验证和授权机制。

接收推送消息

Cloud Pub/Sub 推送请求如以下示例所示。请注意 message.data 字段使用 base64 编码

    POST https://www.example.com/my-push-endpoint 

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }
推送端点需要处理传入的消息并返回 HTTP 状态代码来表示处理成功与否。success 响应等同于确认消息。Cloud Pub/Sub 系统解读为消息确认的状态代码包括 200201202204102。成功响应可能会如下所示:

204 No Content

对于推送订阅,Cloud Pub/Sub 不会发送否定确认(有时称为 NACK)。如果网络钩子未返回成功代码,则 Cloud Pub/Sub 会重试传送,直到消息在订阅的消息保留期限过后失效为止。您可以为推送订阅配置默认确认时限。不过,与拉取订阅不同的是,您无法为个别消息延长时限。时限实际上是端点对推送请求作出响应的时长。

身份验证和授权

使用 JSON 网络令牌 (JWT)

推送订阅可以配置为将服务帐号身份与推送请求关联,从而使推送端点能够对推送订阅进行身份验证。对推送订阅启用身份验证后,来自该订阅的推送请求会在 Authorization 标头中添加已签名的 OpenIDConnect JWT。推送端点可以使用该令牌来验证该请求是否代表与此订阅相关联的服务帐号发出,并做出授权决策。

OpenIDConnect JWT 是一组以英文句点分隔的采用 base64 编码的字符串,包含三个部分:标头、声明集和签名。以下是授权标头示例:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

标头和声明集是 JSON 字符串。解码后,它们将采用以下格式:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

该令牌的生命周期为一小时。

对 App Engine 标准应用和 Cloud Functions 函数网址进行身份验证

如果 Webhook 是订阅所属的项目中的 App Engine 标准应用或 Cloud Functions 函数,则可以使用其他更简单的推送网址保护机制。

对于 App Engine 应用,您可以要求管理员登录,以处理指向使用以下模式的网址的推送请求:/_ah/push-handlers/.*

为此,您需要在 app.yaml(如此 Python 2 示例所示)或 <security-constraint>(如此 Java 示例所示)中添加 login: admin 选项。请注意,login: admin 不适用于 Python 3 应用。您需要在应用代码中实现身份验证逻辑。如需了解详情,请参阅了解访问权限控制

Cloud Functions 可使用类似机制,但不需要任何特定路径。

为推送身份验证设置 Cloud Pub/Sub

订阅的身份验证配置包含以下两个参数:

  • 服务帐号:与推送订阅关联的 GCP 服务帐号
  • 令牌受众群体(可选):一个不区分大小写的字符串,可供 Webhook 用于验证此特定令牌的目标受众群体。

除了配置以上字段之外,您还必须授予 Cloud Pub/Sub 为您的服务帐号创建令牌所需的权限。Cloud Pub/Sub 会为您的项目创建一个特殊服务帐号,并对该帐号进行维护:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.此服务帐号需要 Service Account Token Creator 角色。如果您使用 Cloud Console 为推送身份验证设置订阅,则系统会自动授予该角色。否则,您必须向此帐号明确授予该角色。

请注意,我们将逐步在旧项目中推出此功能,因此在此功能发布后,您的项目可能不会立即存在 Cloud Pub/Sub 服务帐号。此功能可立即用于所有新创建的项目。如果您迫切需要在特定项目中启用该功能,请联系 cloud-pubsub@google.com

命令行

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud beta pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

Console

  1. 转到 Cloud Pub/Sub 主题页面。

    转到“主题”页面

  2. 点击主题名称。

  3. 创建或更新订阅。

  4. 输入身份和(可选)受众群体。

使用推送端点执行身份验证和授权

声明

JWT 可用于验证由 Google 签名的声明(包括 emailaud 声明)。如需详细了解 Google 的 OAuth 2.0 API 如何用于身份验证和授权,请参阅 OpenID Connect

可通过两种机制使声明变得有意义。首先,Cloud Pub/Sub 要求,用于将服务帐号身份与推送订阅关联的用户或服务帐号具有项目或服务帐号的 Service Account User 角色。

其次,严格控制对用于为令牌签名的证书的访问。要创建令牌,Cloud Pub/Sub 必须使用单独的签名服务帐号身份调用内部 Google 服务。该签名服务帐号必须获得授权才能为已声明的服务帐号或此帐号所属的项目创建令牌。您可以使用 iam.serviceAccounts.getOpenIdToken 权限或 Service Account Token Creator 角色执行此操作。

此角色或权限可以授予任何帐号。但是,您可以使用 Cloud IAM 服务来确保 Cloud Pub/Sub 签名帐号具有此权限。具体来说,Cloud Pub/Sub 使用如下所示的服务帐号:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}:订阅所属的 GCP 项目。
  • gcp-sa-pubsub:签名服务帐号所属的 Google 拥有的项目。

验证令牌

以下示例展示了如何对发送到 App Engine 应用的推送请求进行身份验证。

协议

请求:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

响应:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const token = bearer.match(/Bearer (.*)/)[1];
    tokens.push(token);

    // Verify and decode the JWT.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/_ah/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        # Must also verify the `iss` claim.
        if claim['iss'] not in [
            'accounts.google.com',
            'https://accounts.google.com'
        ]:
            raise ValueError('Wrong issuer.')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

您可以在面向网站的 Google 登录指南中找到其他有关如何验证不记名 JWT 的示例。如需详细了解 OpenID 令牌,请参阅 OpenID Connect 指南

Cloud Run

Cloud Run 服务会自动对 HTTP 调用进行身份验证。用户所需的唯一配置是向调用者帐号授予的必要 IAM 角色。例如,您可以授予或撤销用于为帐号调用特定 Cloud Run 端点的权限。

网域所有权验证

为了防止产生不必要的流量,Cloud Pub/Sub 要求您验证推送端点的所有权。您需要对所有端点执行以下步骤,但不包括:

  • Cloud Functions
  • Cloud Run
  • 订阅所属项目中的 App Engine 标准应用

A. 验证您是否拥有相应网域的管理员权限。

使用 Search Console 完成网站验证流程。请务必注册网址的 https:// 版本。如需了解更多详细信息,请参阅网站所有权文档

B. 授予对订阅所属的项目的访问权限:

要使 GCP 项目能够生成发送到网域或端点网址的流量,您还必须授予网域对订阅所属的 GCP 项目的访问权限:

  1. 转到 API 和服务凭据 (APIs & Services Credentials) Console 页面。
    转到“API 和服务凭据”(APIs & Services Credentials) Console 页面
  2. 如有必要,请选择您的项目。
  3. 选择网域验证标签页。
  4. 选择添加网域
  5. 输入相应网域,然后选择添加网域

GCP Console 会根据您在 Search Console 中验证的网域来检查您的网域。假设您已经正确验证了网域,页面将更新以显示新的允许的网域列表。

您现在可以使用以上任一网域来接收推送消息。为此,您需要在创建 Cloud Pub/Sub 订阅时将相应网域配置为端点。如需了解详情,请参阅配置订阅

停止和恢复传送

要暂时停止 Cloud Pub/Sub 将请求发送到推送端点,请将订阅更改为拉取。请注意,此转换可能需要几分钟才能生效。

要恢复推送传送,请再次将网址设置为有效端点。要永久停止传送,请删除订阅

配额、限制和传送速率

请注意,推送订阅受一组配额资源限制的约束。此外,系统会自动调整推送传送的速率,以最大限度地提高传送速率,同时不会使推送端点过载。可以使用慢启动算法实现这一目的:

  • 系统开始一次发送一条消息。
  • 每次成功传送后,并行发送的消息数量将加倍。
  • 系统传送并行消息的速率将继续加倍,直到传送失败或者系统达到配额或资源限制为止。
  • 每当出现传送失败时,向端点发送的并发请求数量将减半,直到达到一次最少一个请求为止。

此算法假定队列中有足够的已发布消息来维持此吞吐量。最终,推送传送速率会受限于发布消息的速率。

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
Cloud Pub/Sub 文档