Using push subscriptions

Pub/Sub supports both push and pull message delivery. For an overview and comparison of pull and push subscriptions, see the Subscriber Overview. This document describes push delivery. For a discussion of pull delivery, see the Pull Subscriber Guide.

A Pub/Sub subscription can be configured to send all messages as an HTTP POST requests to a webhook, a push endpoint, URL. In general, the push endpoint must be a publicly accessible HTTPS server, presenting a valid SSL certificate signed by a certificate authority and routable by DNS. You also must validate your ownership or equivalent access level to the push endpoint domain or a URL path.

In addition, push subscriptions can be configured to provide an authentication header to allow the endpoints to authorize the requests. Alternative and possibly simpler authentication and authorization mechanisms are available for App Engine Standard and Cloud Functions endpoints hosted in the same project as the subscription.

Receiving push messages

A Pub/Sub push request looks like this example below. Note that the message.data field is base64-encoded.

    POST https://www.example.com/my-push-endpoint 

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }
Your push endpoint needs to handle incoming messages and return an HTTP status code to indicate success or failure. A success response is equivalent to acknowledging a messages. The status codes interpreted as message acknowledgements by the Cloud Pub/Sub system are: 200, 201, 202, 204, or 102. A success response might look like this:

204 No Content

For push subscriptions, Pub/Sub does not send a negative acknowledgment (sometimes known as a nack). If your webhook does not return a success code, Pub/Sub retries delivery until the message expires after the subscription's message retention period. You can configure a default acknowledgment deadline for push subscriptions. However, unlike for pull subscriptions, the deadline cannot be extended for individual messages. The deadline is effectively the amount of time the endpoint has to respond to the push request.

Authentication and authorization

Using JSON Web Tokens (JWTs)

Push subscriptions can be configured to associate a service account identity with the push requests, enabling the push endpoint to authenticate them. When authentication is enabled on a push subscription, push requests from that subscription include a signed OpenIDConnect JWT in the authorization header. The push endpoint can use the token to validate that the request is issued on behalf of the service account associated with the subscription and make an authorization decision.

The OpenIDConnect JWT is a set of three period-delimited base64-encoded strings: header, claim set, and signature. Example authorization header:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

The header and claim set are JSON strings. Once decoded, they take the following form:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

The tokens have a lifetime of one hour.

Authenticating App Engine Standard and Cloud Functions URLs

An alternative, simpler mechanism for protecting push endpoints is available for webhooks that are App Engine Standard apps or Cloud Functions in the same project as the subscription.

For App Engine apps, you can grant Pub/Sub administrator login privileges on push requests to your endpoint by using a push endpoint URL with a path of the form: /_ah/push-handlers/.*.

To require admin login to your endpoint, add the login: admin option in your app.yaml as in this Python 2 example, or a <security-constraint> as in this Java example. Note that login: admin will not work for Python 3 applications; instead, you will need to implement authentication logic in your application code. See Understanding Access Control for details.

The mechanism is similar for Cloud Functions, but does not require any particular path.

Setting up Pub/Sub for push authentication

Authentication configuration for a subscription consists of two parameters:

  • Service account: The GCP service account associated with the push subscription. For example, it can be a service account with the role roles/run.invoker and bound to a particular Cloud Run (fully managed) service; a push subscription configured with this service account can invoke the Cloud Run (fully managed) service.
  • Token audience (optional): A single, case-insensitive string that can be used by the webhook to validate the intended audience of this particular token.

In addition to configuring these fields, you must also grant Pub/Sub the permissions needed to create tokens for your service account. Pub/Sub creates and maintains a special service account for your project: service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. This service account needs the Service Account Token Creator role. If you use the Cloud Console to set up the subscription for push authentication, the role is granted automatically. Otherwise, you must explicitly grant the role to the account.

Note that the feature will be rolled out gradually to older projects, so the Pub/Sub service account may not exist for your project right after the feature launches. It will be immediately available for all newly created projects. Contact cloud-pubsub@google.com if you have an urgent need to enable the feature on a specific project.

COMMAND-LINE

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

console

  1. Go to the Pub/Sub Topics page.

    Go to the Topics page

  2. Click a topic name.

  3. Create or update a subscription.

  4. Enter an identity and (optionally) an audience.

Authentication and authorization by the push endpoint

Claims

The JWT can be used to validate that the claims -- including email and aud claims -- are signed by Google. For more information about how Google's OAuth 2.0 APIs can be used for both authentication and authorization, see OpenID Connect.

There are two mechanisms that make these claims meaningful. First, Pub/Sub requires that the user or service account used to associate a service account identity with a push subscription have the Service Account User role for the project or the service account.

Second, access to the certificates used to sign the tokens is tightly controlled. To create the token, Pub/Sub must call an internal Google service using a separate signing service account identity. The signing service account must be authorized to create tokens for the claimed service account or the project containing the account. This is done using the iam.serviceAccounts.getOpenIdToken permission or a Service Account Token Creator role.

This role or permission can be granted to any account. However, you can use the Cloud IAM service to ensure the Pub/Sub signing account is the one with this permission. Specifically, Pub/Sub uses a service account like this one:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: the GCP project that contains the subscription.
  • gcp-sa-pubsub: the Google-owned project which contains the signing service account.

Validating tokens

The following example illustrates how to authenticate a push request to a App Engine application.

protocol

Request:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Response:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messsages have prompted a singple push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/_ah/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        # Must also verify the `iss` claim.
        if claim['iss'] not in [
            'accounts.google.com',
            'https://accounts.google.com'
        ]:
            raise ValueError('Wrong issuer.')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

You will find additional examples of how to validate the bearer JWT in this Guide for Google Sign-in for Websites. A broader overview of OpenID tokens is available in the OpenID Connect Guide.

Cloud Run

The Cloud Run service automatically authenticates HTTP calls by verifying Pub/Sub generated tokens. The only configuration required of the user is that necessary Cloud IAM roles be granted to the caller account. For example, you can authorize or revoke permission to call a particular Cloud Run endpoint for an account. For details, see the following tutorials:

Domain ownership validation

To prevent unwelcome traffic, Pub/Sub requires you to verify your ownership of the push endpoint. The following steps are required for all endpoints except for:

  • Cloud Functions, when using the Pub/Sub trigger
  • Cloud Run services
  • App Engine Standard apps

in the same project as the subscription.

A. Verify you have administrative access to the domain:

Complete the site verification process using Search Console. Be sure to register the https:// version of your site URL. For more details, see the site ownership documentation.

B. Grant access to the project that contains the subscription:

To enable the Google Cloud project to generate traffic to the domain or endpoint URL, you must also grant domain access to the Google Cloud project that contains the subscription:

  1. Go to the APIs & Services Domain verification console page.
    Go to the APIs & Services Credentials console page
  2. Select your project, if necessary.
  3. Select Add domain.
  4. Enter the domain and select Add domain.

The Google Cloud console checks your domain against the ones you've verified in the Search Console. Assuming that you've properly verified the domain, the page updates to show your new list of allowed domains.

You now can use any of these domains to receive push messages. To do so, you need to configure them as endpoints when you create a Pub/Sub subscription. See Configuring Subscriptions for details.

Stopping and resuming delivery

To temporarily stop Pub/Sub from sending requests to the push endpoint, change the subscription to pull. Note that it can take several minutes for this changeover to take effect.

To resume push delivery, set the URL to a valid endpoint again. To permanently stop delivery, delete the subscription.

Quotas, limits and delivery rate

Note that push subscriptions are subject to a set of quotas and resource limits.

In addition, the rate of push delivery is automatically adjusted to maximize delivery rate while not overwhelming the push endpoint. This is accomplished using a slow-start algorithm:

  • The system starts by sending a single message at a time.
  • With each successful delivery, the number of messages sent concurrently is doubled.
  • The rate at which the system delivers concurrent messages continues to double until there is a delivery failure or the system reaches a quota or resource limit.
  • For each delivery failure, the number of concurrent requests to the endpoint halves, until a minimum of one request at a time is reached.

This algorithm assumes that there are enough published messages in the queue to sustain this throughput. Ultimately, the push delivery rate is constrained by the rate at which messages are published.

Bu sayfayı yararlı buldunuz mu? Lütfen görüşünüzü bildirin:

Şunun hakkında geri bildirim gönderin...

Cloud Pub/Sub Documentation