Esta página se ha traducido con Cloud Translation API.
Switch to English

Usa suscripciones de envío

Pub/Sub admite la entrega de mensajes de envío y extracción. Para obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento se describe la entrega de envío. Para ver un análisis sobre la entrega de extracción, consulta la guía del suscriptor de extracción.

Si una suscripción usa la entrega de envío, el servicio de Pub/Sub entrega mensajes a un extremo de envío. El extremo de envío debe ser una dirección HTTPS de acceso público. El servidor para el extremo de envío debe tener un certificado SSL válido firmado por una autoridad certificada.

El servicio Pub/Sub entrega mensajes a los extremos de envío desde la misma región de Google Cloud en la que el servicio de Pub/Sub almacena los mensajes. El servicio Pub/Sub entrega mensajes de la misma región de Google Cloud según el criterio del mejor esfuerzo.

Además, las suscripciones de envío se pueden configurar a fin de proporcionar un encabezado de autorización para permitir que los extremos autentiquen las solicitudes. Los mecanismos de autenticación y autorización automáticos están disponibles para los extremos del entorno estándar de App Engine y Cloud Functions alojados en el mismo proyecto que la suscripción.

Recibe mensajes

Cuando Pub/Sub envía un mensaje a un extremo de envío, Pub/Sub envía el mensaje en el cuerpo de una solicitud POST. El cuerpo de la solicitud es un objeto JSON y los datos del mensaje están en el campo message.data. Los datos del mensaje están codificados en base64.

El siguiente ejemplo es el cuerpo de una solicitud POST a un extremo de envío:

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "2070443601311540",
        "message_id": "2070443601311540",
        "publishTime": "2021-02-26T19:13:55.749Z",
        "publish_time": "2021-02-26T19:13:55.749Z",
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

Para recibir mensajes de suscripciones de envío, usa un webhook y procesa las solicitudes POST que Pub/Sub envía al extremo de envío. Para obtener más información sobre cómo procesar estas solicitudes POST en App Engine, consulta Escribe y responde mensajes de Pub/Sub.

Después de recibir una solicitud de envío, muestra un código de estado HTTP. Para confirmar el mensaje, muestra uno de los siguientes códigos de estado:

  • 102
  • 200
  • 201
  • 202
  • 204

Para enviar una confirmación negativa del mensaje, muestra cualquier otro código de estado. Si envías una confirmación negativa o el plazo de confirmación vence, Pub/Sub reenvía el mensaje. No puedes modificar el plazo de confirmación para los mensajes individuales que recibes de las suscripciones de envío.

Autenticación y autorización

Si una suscripción de envío usa autenticación, el servicio de Pub/Sub firma un token web JSON (JWT) y envía el JWT en el encabezado de autorización de la solicitud de envío. El JWT incluye reclamaciones y una firma.

Los suscriptores pueden decodificar el JWT y verificar lo siguiente:

  • Los reclamos son precisos.
  • El servicio de Pub/Sub firmó las reclamaciones.

Si los suscriptores usan un firewall, no pueden recibir solicitudes de envío. Para recibir solicitudes de envío, debes desactivar el firewall y verificar el JWT.

Formato JWT

JWT es un JWT de OpenIDConnect que consiste en un encabezado, un conjunto de reclamos y una firma. El servicio Pub/Sub codifica el JWT como una string base64 con delimitadores de punto.

Por ejemplo, el siguiente encabezado de autorización incluye un JWT codificado:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

El encabezado y el conjunto de reclamos son strings de JSON. Una vez decodificados, toman el siguiente formato:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

Los tokens adjuntos a las solicitudes enviadas a los extremos de envío pueden tener hasta una hora de antigüedad.

Configura Pub/Sub para que use la autenticación de extracción

La configuración de autenticación para una suscripción consta de los dos parámetros siguientes:

  • Cuenta de servicio: La cuenta de servicio de GCP asociada con la suscripción de envío. Las solicitudes de envío llevan la identidad de esta cuenta de servicio. Como ejemplo, una suscripción de envío configurada con una cuenta de servicio que tiene la función roles/run.invoker y está vinculada a un servicio particular de Cloud Run (completamente administrado) puede invocar a Cloud Run (completamente administrado).
  • Público del token: Es una string única, que no distingue entre mayúsculas y minúsculas y que puede usar el webhook para validar el público objetivo de este token en particular (opcional).

Además de configurar estos campos, también debes otorgar a Pub/Sub los permisos necesarios a fin de crear tokens para tu cuenta de servicio. Pub/Sub crea y mantiene una cuenta de servicio especial para tu proyecto: service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.. Esta cuenta de servicio necesita la función Creador de tokens de cuenta de servicio. Si usas Cloud Console con el fin de configurar la suscripción para la autenticación de envío, la función se otorga de forma automática. De lo contrario, debes asignar de forma explícita la función a la cuenta.

LÍNEA DE COMANDOS

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

CONSOLE

  1. Ve a la página Temas de Pub/Sub.

    Ir a la página Temas

  2. Haz clic en el nombre de un tema.

  3. Crea o actualiza una suscripción.

  4. Ingresa una identidad y un público (opcional).

Autenticación y autorización que realiza el extremo de envío

Reclamos

El JWT se puede usar para validar que los reclamos, incluidos los reclamos email y aud, estén firmados por Google. Consulta OpenID Connect si deseas obtener más información sobre cómo se pueden usar las API de OAuth 2.0 de Google para la autenticación y la autorización.

Hay dos mecanismos que hacen que estas afirmaciones tengan sentido. Primero, Pub/Sub requiere que la cuenta de usuario o servicio usada para asociar una identidad de cuenta de servicio con una suscripción de envío tenga la función Usuario de cuenta de servicio (roles/iam.serviceAccountUser) para el proyecto o la cuenta de servicio.

En segundo lugar, el acceso a los certificados usados para firmar los tokens está controlado de forma estricta. Para crear el token, Pub/Sub debe llamar a un servicio interno de Google mediante el uso de una identidad de cuenta de servicio de firma distinta. La cuenta de servicio de firma debe estar autorizada a fin de crear tokens para la cuenta de servicio reclamada o para el proyecto que la contiene. Esto se hace con el permiso iam.serviceAccounts.getOpenIdToken o la función Creador de tokens de cuenta de servicio (roles/iam.serviceAccountTokenCreator).

Esta función o permiso se puede otorgar a cualquier cuenta. Sin embargo, puedes usar el servicio de IAM para asegurarte de que la cuenta de firma de Pub/Sub sea la que tenga este permiso. Pub/Sub usa una cuenta de servicio como esta:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: El proyecto de GCP que contiene la suscripción.
  • gcp-sa-pubsub: El proyecto que es propiedad de Google que contiene la cuenta de servicio de firma.

Valida tokens

En el siguiente ejemplo, se muestra cómo autenticar una solicitud de inserción en una aplicación de App Engine.

protocol

Solicitud:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Respuesta:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Comienza a usarlo

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
		return
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Ruby

post "/pubsub/authenticated-push" do
  halt 400 if params[:token] != PUBSUB_VERIFICATION_TOKEN

  begin
    bearer = request.env["HTTP_AUTHORIZATION"]
    token = /Bearer (.*)/.match(bearer)[1]
    claim = Google::Auth::IDTokens.verify_oidc token, aud: "example.com"
    claims.push claim
  rescue Google::Auth::IDTokens::VerificationError => e
    puts "VerificationError: #{e.message}"
    halt 400, "Invalid token"
  end

  message = JSON.parse request.body.read
  payload = Base64.decode64 message["message"]["data"]

  messages.push payload
end

Encontrarás ejemplos adicionales de cómo validar el portador JWT en esta guía para el acceso con Google a sitios web. Puedes encontrar una descripción general más amplia de los tokens de OpenID en la guía de OpenID Connect, incluida una lista de bibliotecas cliente que ayudan a validar JWT.

Cloud Run y App Engine

Cloud Run y App Engine autentican automáticamente las llamadas HTTP mediante la verificación de los tokens que genera Pub/Sub. La única configuración requerida por el usuario es que las funciones de IAM necesarias se otorguen a la cuenta del emisor. Por ejemplo, puedes autorizar o revocar el permiso a fin de llamar a un extremo de Cloud Run en particular para una cuenta. Para más detalles, consulta los siguientes instructivos:

Detén y restablece la entrega

Para evitar por un tiempo que Pub/Sub envíe solicitudes al extremo de envío, cambia la suscripción a extracción. Ten en cuenta que este cambio puede tomar varios minutos hasta que se efectúe.

Para restablecer la entrega de envío, vuelve a configurar la URL en un extremo válido. Para detener la entrega de forma permanente, borra la suscripción.

Cuotas, límites y frecuencia de entrega

Ten en cuenta que las suscripciones de envío están sujetas a un conjunto de cuotas y límites de recursos.

Retirada de envío

Si un suscriptor de envío envía confirmaciones negativas, Pub/Sub puede entregar mensajes con una retirada de envío. Cuando Pub/Sub usa una retirada de envío, deja de entregar mensajes durante 100 milisegundos a 60 segundos y, luego, comienza a entregar mensajes de nuevo.

La retirada de envío es una retirada exponencial que evita que un suscriptor de envío reciba mensajes que no puede procesar. La cantidad de tiempo que Pub/Sub deja de enviar mensajes depende de la cantidad de confirmaciones negativas que envían los suscriptores de envío.

Por ejemplo, si un suscriptor de envío recibe cinco mensajes por segundo y envía una confirmación negativa por segundo, Pub/Sub entrega mensajes aproximadamente cada 500 milisegundos. Si el suscriptor de envío envía cinco confirmaciones negativas por segundo, Pub/Sub entrega mensajes cada 30 a 60 segundos.

Frecuencia de envío

Pub/Sub ajusta la cantidad de solicitudes de envío simultáneas mediante un algoritmo de inicio lento. La cantidad máxima de solicitudes de envío simultáneas es la ventana de envío. La ventana de envío aumenta en cualquier entrega exitosa y disminuye las fallas. El sistema comienza con una ventana pequeña: 3 veces N, en la que N es la cantidad de regiones de publicación.

Cuando un suscriptor confirma mensajes, la ventana aumenta exponencialmente hasta 3,000 veces N mensajes pendientes. Para las suscripciones en las que los suscriptores reconocen más del 99% de los mensajes y promedio de menos de un segundo de la latencia de la solicitud de envío, la ventana de envío aumenta hasta 30,000 veces N mensajes pendientes.

La latencia de la solicitud de envío incluye lo siguiente:

Después de 3,000 mensajes pendientes, la ventana aumenta de forma lineal para evitar que el extremo de envío reciba demasiados mensajes. Si la latencia promedio supera un segundo o el suscriptor reconoce menos del 99% de las solicitudes, la ventana disminuye al límite inferior de 3,000 mensajes pendientes.

Si quieres obtener más información sobre las métricas que puedes usar para supervisar la entrega de envío, consulta Supervisa las suscripciones de envío.