Usa suscripciones de envío

Pub/Sub admite la entrega de mensajes de envío y extracción. Si quieres obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento se describe la entrega de envío. Para ver un análisis sobre la entrega de extracción, consulta la guía del suscriptor de extracción.

Una suscripción a Pub/Sub se puede configurar para enviar todos los mensajes como solicitudes HTTP POST a un webhook, un extremo de envío o una URL. En general, el extremo de envío debe ser un servidor HTTPS de acceso público, que presente un certificado SSL válido firmado por una autoridad certificada y enrutable por DNS. Pub/Sub enviará mensajes a la suscripción desde cada región en la que se publiquen mensajes sobre el tema de la suscripción.

Además, las suscripciones push se pueden configurar para proporcionar un encabezado de autorización que permita a los extremos autenticar las solicitudes. Hay mecanismos de autenticación y autorización automáticos disponibles para los extremos de App Engine Standard y Cloud Functions alojados en el mismo proyecto que la suscripción.

Recibe mensajes de envío

Una solicitud de envío de Pub/Sub se parece a este ejemplo a continuación. Ten en cuenta que el campo datos de mensaje está codificado en base64.

        POST https://www.example.com/my-push-endpoint 

       {
         "message": {
           "attributes": {
             "key": "value"
           },
           "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
           "messageId": "136969346945"
         },
         "subscription": "projects/myproject/subscriptions/mysubscription"
       }
Tu extremo de envío debe controlar los mensajes entrantes y mostrar un código de estado HTTP para indicar el éxito o el fracaso. Una respuesta success es equivalente a reconocer un mensaje. Los códigos de estado que el sistema de Cloud Pub/Sub interpreta como confirmaciones de mensajes son: 200, 201, 202, 204102. El siguiente es un ejemplo de una respuesta exitosa:

    204 No Content

Para las suscripciones de envío, Pub/Sub no envía una confirmación negativa (a veces conocida como nack). Si tu webhook no muestra un código de éxito, Pub/Sub volverá a intentar la entrega hasta que el mensaje venza después del período de retención de mensajes de la suscripción. Puedes configurar un plazo límite de confirmación predeterminado para las suscripciones de envío. Sin embargo, a diferencia de las suscripciones de extracción, el plazo no se puede extender para mensajes individuales. El plazo es, en efecto, la cantidad de tiempo que el extremo tiene para responder a la solicitud de envío.

Autenticación y autorización

Uso de tokens web JSON (JWT)

Las suscripciones de extracción se pueden configurar para asociar una identidad de cuenta de servicio con las solicitudes de envío, lo que permite que el extremo de envío las autentique. Cuando la autenticación está habilitada en una suscripción de envío, las solicitudes de envío de esa suscripción incluyen un JWT de OpenIDConnect firmado en el encabezado de autorización. El extremo de envío puede usar el token para validar que la solicitud se emite en nombre de la cuenta de servicio asociada con la suscripción y tomar una decisión de autorización.

El JWT de OpenIDConnect es un conjunto de tres strings codificadas en base64 delimitadas por el punto: encabezado, conjunto de reclamos y firma. Este es un ejemplo de encabezado de autorización:

    "Authorization" : "Bearer
    eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
    YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
    jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
    SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
    XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
    TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
    4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
    WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
    z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
    cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

El encabezado y el conjunto de reclamos son strings de JSON. Una vez decodificados, toman el siguiente formato:

    {"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

    {
       "aud":"https://example.com",
       "azp":"113774264463038321964",
       "email":"gae-gcp@appspot.gserviceaccount.com",
       "sub":"113774264463038321964",
       "email_verified":true,
       "exp":1550185935,
       "iat":1550182335,
       "iss":"https://accounts.google.com"
      }

Los tokens adjuntos a las solicitudes enviadas a extremos de envío pueden tener hasta una hora de antigüedad.

Configura Pub/Sub para que use la autenticación de extracción

La configuración de autenticación para una suscripción consta de los dos parámetros siguientes:

  • Cuenta de servicio: La cuenta de servicio de GCP asociada con la suscripción de envío. Las solicitudes push tienen la identidad de esta cuenta de servicio. Como ejemplo, una suscripción push configurada con una cuenta de servicio que tiene la función roles/run.invoker y está vinculada a un servicio de Cloud Run (completamente administrado) puede invocar ese servicio de Cloud Run (completamente administrado).
  • Público del token: Es una string única, que no distingue entre mayúsculas y minúsculas y que puede usar el webhook para validar el público objetivo de este token en particular (opcional).

Además de configurar estos campos, también debes otorgar a Pub/Sub los permisos necesarios a fin de crear tokens para tu cuenta de servicio. Pub/Sub crea y mantiene una cuenta de servicio especial para tu proyecto: service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.. Esta cuenta de servicio necesita la función Creador de tokens de cuenta de servicio. Si usas Cloud Console con el fin de configurar la suscripción para la autenticación de envío, la función se otorga de forma automática. De lo contrario, debes asignar de forma explícita la función a la cuenta.

LÍNEA DE COMANDOS

    # grant Cloud Pub/Sub the permission to create tokens
    PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
     --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
     --role='roles/iam.serviceAccountTokenCreator'

    # configure the subscription push identity
    gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
     --topic=${TOPIC} \
     --push-endpoint=${PUSH_ENDPOINT_URI} \
     --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
     --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

CONSOLE

  1. Ve a la página Temas de Pub/Sub.

    Ir a la página Temas

  2. Haz clic en el nombre de un tema.

  3. Crea o actualiza una suscripción.

  4. Ingresa una identidad y un público (opcional).

Autenticación y autorización que realiza el extremo de envío

Reclamos

El JWT se puede usar para validar que los reclamos, incluidos los reclamos email y aud, estén firmados por Google. Consulta OpenID Connect si deseas obtener más información sobre cómo se pueden usar las API de OAuth 2.0 de Google para la autenticación y la autorización.

Hay dos mecanismos que hacen que estas afirmaciones tengan sentido. Primero, Pub/Sub requiere que la cuenta de usuario o servicio usada para asociar una identidad de cuenta de servicio con una suscripción de envío tenga la función Usuario de cuenta de servicio para el proyecto o la cuenta de servicio.

En segundo lugar, el acceso a los certificados usados para firmar los tokens está controlado de forma estricta. Para crear el token, Pub/Sub debe llamar a un servicio interno de Google mediante el uso de una identidad de cuenta de servicio de firma distinta. La cuenta de servicio de firma debe estar autorizada a fin de crear tokens para la cuenta de servicio reclamada o para el proyecto que la contiene. Esto se hace con el permiso iam.serviceAccounts.getOpenIdToken o la función Creador de tokens de cuenta de servicio.

Esta función o permiso se puede otorgar a cualquier cuenta. Sin embargo, puedes usar el servicio de Cloud IAM para asegurarte de que la cuenta de firma de Pub/Sub sea la que tenga este permiso. Pub/Sub usa una cuenta de servicio como esta:

    service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: El proyecto de GCP que contiene la suscripción.
  • gcp-sa-pubsub: El proyecto que es propiedad de Google que contiene la cuenta de servicio de firma.

Valida tokens

En el siguiente ejemplo, se muestra cómo autenticar una solicitud de inserción en una aplicación de App Engine.

protocol

Solicitud:

    GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}
    

Respuesta:

200 OK
    {
        "alg": "RS256",
        "aud": "example.com",
        "azp": "104176025330667568672",
        "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
        "email_verified": "true",
        "exp": "1555463097",
        "iat": "1555459497",
        "iss": "https://accounts.google.com",
        "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
        "sub": "104176025330667568672",
        "typ": "JWT"
    }
    

Java

@WebServlet(value = "/pubsub/authenticated-push")
    public class PubSubAuthenticatedPush extends HttpServlet {
      private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
      private final MessageRepository messageRepository;
      private final GoogleIdTokenVerifier verifier =
          new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
              /**
               * Please change example.com to match with value you are providing while creating
               * subscription as provided in @see <a
               * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
               */
              .setAudience(Collections.singletonList("example.com"))
              .build();
      private final Gson gson = new Gson();
      private final JsonParser jsonParser = new JsonParser();

      @Override
      public void doPost(HttpServletRequest req, HttpServletResponse resp)
          throws IOException, ServletException {

        // Verify that the request originates from the application.
        if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
          resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
          return;
        }
        // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        String authorizationHeader = req.getHeader("Authorization");
        if (authorizationHeader == null
            || authorizationHeader.isEmpty()
            || authorizationHeader.split(" ").length != 2) {
          resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
          return;
        }
        String authorization = authorizationHeader.split(" ")[1];

        try {
          // Verify and decode the JWT.
          // Note: For high volume push requests, it would save some network overhead
          // if you verify the tokens offline by decoding them using Google's Public
          // Cert; caching already seen tokens works best when a large volume of
          // messsages have prompted a singple push server to handle them, in which
          // case they would all share the same token for a limited time window.
          GoogleIdToken idToken = verifier.verify(authorization);
          messageRepository.saveToken(authorization);
          messageRepository.saveClaim(idToken.getPayload().toPrettyString());
          // parse message object from "message" field in the request body json
          // decode message data from base64
          Message message = getMessage(req);
          messageRepository.save(message);
          // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
          resp.setStatus(102);
          super.doPost(req, resp);
        } catch (Exception e) {
          resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
        }
      }

      private Message getMessage(HttpServletRequest request) throws IOException {
        String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
        JsonElement jsonRoot = jsonParser.parse(requestBody);
        String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
        Message message = gson.fromJson(messageStr, Message.class);
        // decode from base64
        String decoded = decode(message.getData());
        message.setData(decoded);
        return message;
      }

      private String decode(String data) {
        return new String(Base64.getDecoder().decode(data));
      }

      PubSubAuthenticatedPush(MessageRepository messageRepository) {
        this.messageRepository = messageRepository;
      }

      public PubSubAuthenticatedPush() {
        this(MessageRepositoryImpl.getInstance());
      }
    }

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
      // Verify that the request originates from the application.
      if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
        res.status(400).send('Invalid request');
        return;
      }

      // Verify that the push request originates from Cloud Pub/Sub.
      try {
        // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        const bearer = req.header('Authorization');
        const [, token] = bearer.match(/Bearer (.*)/);
        tokens.push(token);

        // Verify and decode the JWT.
        // Note: For high volume push requests, it would save some network
        // overhead if you verify the tokens offline by decoding them using
        // Google's Public Cert; caching already seen tokens works best when
        // a large volume of messsages have prompted a singple push server to
        // handle them, in which case they would all share the same token for
        // a limited time window.
        const ticket = await authClient.verifyIdToken({
          idToken: token,
          audience: 'example.com',
        });

        const claim = ticket.getPayload();
        claims.push(claim);
      } catch (e) {
        res.status(400).send('Invalid token');
        return;
      }

      // The message is a unicode string encoded in base64.
      const message = Buffer.from(req.body.message.data, 'base64').toString(
        'utf-8'
      );

      messages.push(message);

      res.status(200).send();
    });

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
    def receive_messages_handler():
        # Verify that the request originates from the application.
        if (request.args.get('token', '') !=
                current_app.config['PUBSUB_VERIFICATION_TOKEN']):
            return 'Invalid request', 400

        # Verify that the push request originates from Cloud Pub/Sub.
        try:
            # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
            bearer_token = request.headers.get('Authorization')
            token = bearer_token.split(' ')[1]
            TOKENS.append(token)

            # Verify and decode the JWT. `verify_oauth2_token` verifies
            # the JWT signature, the `aud` claim, and the `exp` claim.
            # Note: For high volume push requests, it would save some network
            # overhead if you verify the tokens offline by downloading Google's
            # Public Cert and decode them using the `google.auth.jwt` module;
            # caching already seen tokens works best when a large volume of
            # messages have prompted a single push server to handle them, in which
            # case they would all share the same token for a limited time window.
            claim = id_token.verify_oauth2_token(token, requests.Request(),
                                                 audience='example.com')
            # Must also verify the `iss` claim.
            if claim['iss'] not in [
                'accounts.google.com',
                'https://accounts.google.com'
            ]:
                raise ValueError('Wrong issuer.')
            CLAIMS.append(claim)
        except Exception as e:
            return 'Invalid token: {}\n'.format(e), 400

        envelope = json.loads(request.data.decode('utf-8'))
        payload = base64.b64decode(envelope['message']['data'])
        MESSAGES.append(payload)
        # Returning any 2xx status indicates successful receipt of the message.
        return 'OK', 200

Encontrarás ejemplos adicionales de cómo validar el portador JWT en esta guía para el acceso con Google a sitios web. Además, encontrarás una descripción general de los tokens OpenID en la guía de OpenID Connect.

Cloud Run y App Engine

Cloud Run y App Engine autentican automáticamente las llamadas HTTP mediante la verificación de los tokens generados por Pub/Sub. La única configuración requerida del usuario es que se otorguen las funciones de Cloud IAM necesarias a la cuenta del emisor. Por ejemplo, puedes autorizar o revocar el permiso a fin de llamar a un extremo de Cloud Run en particular para una cuenta. Para más detalles, consulta los siguientes instructivos:

Detén y restablece la entrega

Para evitar por un tiempo que Pub/Sub envíe solicitudes al extremo de envío, cambia la suscripción a extracción. Ten en cuenta que este cambio puede tomar varios minutos hasta que se efectúe.

Para restablecer la entrega de envío, vuelve a configurar la URL en un extremo válido. Para detener la entrega de forma permanente, borra la suscripción.

Cuotas, límites y frecuencia de entrega

Ten en cuenta que las suscripciones de envío están sujetas a un conjunto de cuotas y límites de recursos.

Si Pub/Sub no recibe una respuesta success, Pub/Sub aplica una retirada exponencial con un mínimo de 100 milisegundos y un máximo de 60 segundos.

Pub/Sub ajusta la cantidad de solicitudes push simultáneas mediante un algoritmo de inicio lento. La cantidad máxima permitida de solicitudes de inserción simultáneas es la ventana de aplicación. La ventana de envío aumenta con cualquier publicación exitosa y disminuye con cualquier falla. El sistema comienza con una ventana pequeña: 3 * N donde N es el número de regiones de publicación. El tamaño máximo de la ventana es 3,000 * N. La cantidad real de solicitudes push simultáneas o pendientes se puede supervisar con la métrica de Cloud Monitoring pubsub.googleapis.com/subscription/num_outstanding_messages.