Usa suscripciones de envío

Pub/Sub admite la entrega de mensajes de envío y extracción. Para obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento se describe la entrega de envío. Para ver un análisis sobre la entrega de extracción, consulta la guía del suscriptor de extracción.

Una suscripción a Pub/Sub se puede configurar para enviar todos los mensajes como solicitudes HTTP POST a un webhook, un extremo de envío o una URL. En general, el extremo de envío debe ser un servidor HTTPS de acceso público, que presente un certificado SSL válido firmado por una autoridad certificada y enrutable por DNS.

Además, las suscripciones de envío se pueden configurar a fin de proporcionar un encabezado de autenticación para permitir que los extremos autoricen las solicitudes. Existen mecanismos de autenticación y autorización alternativos que suelen ser más simples para los extremos del entorno estándar de App Engine y Cloud Functions alojados en el mismo proyecto que la suscripción.

Recibe mensajes de envío

Una solicitud de envío de Pub/Sub se parece a este ejemplo a continuación. Ten en cuenta que el campo datos de mensaje está codificado en base64.

    POST https://www.example.com/my-push-endpoint 

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }
Tu extremo de envío debe controlar los mensajes entrantes y mostrar un código de estado HTTP para indicar el éxito o el fracaso. Una respuesta success es equivalente a la confirmación de un mensaje. Los códigos de estado que el sistema de Cloud Pub/Sub interpreta como confirmaciones de mensajes son: 200, 201, 202, 204102. El siguiente es un ejemplo de una respuesta exitosa:

204 No Content

Para las suscripciones de envío, Pub/Sub no envía una confirmación negativa (a veces conocida como nack). Si tu webhook no muestra un código de éxito, Pub/Sub volverá a intentar la entrega hasta que el mensaje venza después del período de retención de mensajes de la suscripción. Puedes configurar un plazo límite de confirmación predeterminado para las suscripciones de envío. Sin embargo, a diferencia de las suscripciones de extracción, el plazo no se puede extender para mensajes individuales. El plazo es, en efecto, la cantidad de tiempo que el extremo tiene para responder a la solicitud de envío.

Autenticación y autorización

Uso de tokens web JSON (JWT)

Las suscripciones de extracción se pueden configurar para asociar una identidad de cuenta de servicio con las solicitudes de envío, lo que permite que el extremo de envío las autentique. Cuando la autenticación está habilitada en una suscripción de envío, las solicitudes de envío de esa suscripción incluyen un JWT de OpenIDConnect firmado en el encabezado de autorización. El extremo de envío puede usar el token para validar que la solicitud se emite en nombre de la cuenta de servicio asociada con la suscripción y tomar una decisión de autorización.

El JWT de OpenIDConnect es un conjunto de tres strings codificadas en base64 delimitadas por el punto: encabezado, conjunto de reclamos y firma. Este es un ejemplo de encabezado de autorización:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

El encabezado y el conjunto de reclamos son strings de JSON. Una vez decodificados, toman el siguiente formato:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

Los tokens tienen una vida útil de una hora.

Autentica URL de entorno estándar de App Engine

Un mecanismo de autorización más simple está disponible para los extremos que son apps de entorno estándar de App Engine. Puedes configurar las apps de entorno estándar de App Engine para que requieran que los emisores se autentiquen como Usuarios admin. Para solicitar el acceso de administrador a tu extremo, agrega la opción login: admin en tu app.yaml como en este ejemplo de Python 2 o un <security-constraint> como en este ejemplo de Java.

Las solicitudes de envío de Pub/Sub a URL de extremos de App Engine con una ruta con formato /_ah/push-handlers/.* siempre están autorizadas como solicitudes de usuarios “admin”. Para asegurar el extremo, Pub/Sub requiere que las URL del extremo de App Engine estén asociadas a una App Engine en el mismo proyecto que la suscripción. De lo contrario, se rechaza la solicitud para establecer la URL del extremo.

Ten en cuenta que login: admin no funcionará para las aplicaciones con Python 3; en cambio, deberás implementar la lógica de autenticación en el código de tu aplicación. Consulta la Explicación del control de acceso para obtener más detalles.

Configura Pub/Sub para que use la autenticación de extracción

La configuración de autenticación para una suscripción consta de los dos parámetros siguientes:

  • Cuenta de servicio: La cuenta de servicio de GCP asociada con la suscripción de envío. Por ejemplo, puede ser una cuenta de servicio con la función roles/run.invoker que esté vinculada a un servicio particular de Cloud Run (completamente administrado); una suscripción de envío configurada con esta cuenta de servicio puede invocar el servicio Cloud Run (completamente administrado).
  • Público del token: Es una string única, que no distingue entre mayúsculas y minúsculas y que puede usar el webhook para validar el público objetivo de este token en particular (opcional).

Además de configurar estos campos, también debes otorgar a Pub/Sub los permisos necesarios a fin de crear tokens para tu cuenta de servicio. Pub/Sub crea y mantiene una cuenta de servicio especial para tu proyecto: service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.. Esta cuenta de servicio necesita la función Creador de tokens de cuenta de servicio. Si usas Cloud Console con el fin de configurar la suscripción para la autenticación de envío, la función se otorga de forma automática. De lo contrario, debes asignar de forma explícita la función a la cuenta.

Ten en cuenta que la función se implementará de forma gradual en proyectos más antiguos, por lo que es posible que la cuenta de servicio de Pub/Sub no aparezca para tu proyecto de forma inmediata después de su asignación. Estará disponible de inmediato para todos los proyectos recién creados. Comunícate con cloud-pubsub@google.com si tienes la necesidad urgente de habilitar la función en un proyecto específico.

LÍNEA DE COMANDOS

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

CONSOLE

  1. Ve a la página Temas de Pub/Sub.

    Ir a la página Temas

  2. Haz clic en el nombre de un tema.

  3. Crea o actualiza una suscripción.

  4. Ingresa una identidad y un público (opcional).

Autenticación y autorización que realiza el extremo de envío

Reclamos

El JWT se puede usar para validar que los reclamos, incluidos los reclamos email y aud, estén firmados por Google. Consulta OpenID Connect si deseas obtener más información sobre cómo se pueden usar las API de OAuth 2.0 de Google para la autenticación y la autorización.

Hay dos mecanismos que hacen que estas afirmaciones tengan sentido. Primero, Pub/Sub requiere que la cuenta de usuario o servicio usada para asociar una identidad de cuenta de servicio con una suscripción de envío tenga la función Usuario de cuenta de servicio para el proyecto o la cuenta de servicio.

En segundo lugar, el acceso a los certificados usados para firmar los tokens está controlado de forma estricta. Para crear el token, Pub/Sub debe llamar a un servicio interno de Google mediante el uso de una identidad de cuenta de servicio de firma distinta. La cuenta de servicio de firma debe estar autorizada a fin de crear tokens para la cuenta de servicio reclamada o para el proyecto que la contiene. Esto se hace con el permiso iam.serviceAccounts.getOpenIdToken o la función Creador de tokens de cuenta de servicio.

Esta función o permiso se puede otorgar a cualquier cuenta. Sin embargo, puedes usar el servicio de Cloud IAM para asegurarte de que la cuenta de firma de Pub/Sub sea la que tenga este permiso. Pub/Sub usa una cuenta de servicio como esta:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: El proyecto de GCP que contiene la suscripción.
  • gcp-sa-pubsub: El proyecto que es propiedad de Google que contiene la cuenta de servicio de firma.

Valida tokens

En el siguiente ejemplo, se muestra cómo autenticar una solicitud de inserción en una aplicación de App Engine.

protocol

Solicitud:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Respuesta:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messsages have prompted a singple push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/_ah/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        # Must also verify the `iss` claim.
        if claim['iss'] not in [
            'accounts.google.com',
            'https://accounts.google.com'
        ]:
            raise ValueError('Wrong issuer.')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Encontrarás ejemplos adicionales de cómo validar el portador JWT en esta guía para el acceso con Google a sitios web. Además, encontrarás una descripción general de los tokens OpenID en la guía de OpenID Connect.

Cloud Run

El servicio Cloud Run autentica de forma automática las llamadas HTTP mientras verifica los tokens generados por Pub/Sub. La única configuración requerida por el usuario es que las funciones de Cloud IAM necesarias se otorguen a la cuenta del emisor. Por ejemplo, puedes autorizar o revocar el permiso a fin de llamar a un extremo de Cloud Run en particular para una cuenta. Para más detalles, consulta los siguientes instructivos:

Detén y restablece la entrega

Para evitar por un tiempo que Pub/Sub envíe solicitudes al extremo de envío, cambia la suscripción a extracción. Ten en cuenta que este cambio puede tomar varios minutos hasta que se efectúe.

Para restablecer la entrega de envío, vuelve a configurar la URL en un extremo válido. Para detener la entrega de forma permanente, borra la suscripción.

Cuotas, límites y frecuencia de entrega

Ten en cuenta que las suscripciones de envío están sujetas a un conjunto de cuotas y límites de recursos.

Además, la frecuencia de entrega de envío se ajusta de forma automática para maximizar la frecuencia de entrega sin sobrecargar el extremo de envío. Esto se logra mediante un algoritmo de inicio lento:

  • El sistema comienza con el envío de un solo mensaje a la vez.
  • Con cada entrega exitosa, el número de mensajes enviados al mismo tiempo se duplica.
  • La velocidad a la que el sistema envía mensajes simultáneos continúa duplicándose hasta que se produce una falla en la entrega o cuando el sistema alcanza una cuota o un límite de recursos.
  • Con cada falla de entrega, la cantidad de solicitudes simultáneas del extremo se divide, hasta que se alcanza un mínimo de una solicitud a la vez.

Este algoritmo supone que hay suficientes mensajes publicados en la cola para mantener esta capacidad de procesamiento. Por último, la velocidad a la que se publican los mensajes es el límite para la frecuencia de entrega de envío.