Utiliser des abonnements en mode push

Pub/Sub est compatible avec la distribution de messages en mode push et pull. Pour obtenir une présentation et une comparaison des abonnements en mode pull et push, consultez la page Présentation des abonnements. Ce document décrit la distribution push. Pour en savoir plus sur la distribution pull, consultez le Guide pour les abonnés – Mode pull.

Si un abonnement utilise la distribution push, le service Pub/Sub distribue les messages à un point de terminaison push. Le point de terminaison push doit être une adresse HTTPS accessible au public. Le serveur associé au point de terminaison push doit disposer d'un certificat SSL valide signé par une autorité de certification.

De plus, les abonnements push peuvent être configurés pour fournir un en-tête d'autorisation permettant aux points de terminaison d'authentifier les requêtes. Les mécanismes d'authentification et d'autorisation automatiques sont disponibles pour les points de terminaison de l'environnement standard App Engine et de Cloud Functions hébergés dans le même projet que l'abonnement.

Recevoir des messages

Lorsque Pub/Sub envoie un message à un point de terminaison push, Pub/Sub envoie le message dans le corps d'une requête POST. Le corps de la requête est un objet JSON et les données du message se trouvent dans le champ message.data. Les données du message sont encodées en base64.

L'exemple suivant est le corps d'une requête POST envoyée à un point de terminaison push :

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "136969346945"
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

Pour recevoir des messages d'abonnements push, utilisez un webhook et traitez les requêtes POST que Pub/Sub envoie au point de terminaison push. Par exemple, les fonctions Cloud Functions suivantes sont des webhooks qui décodent et affichent les données de message :

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data) // Automatically decoded from base64.
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import functions.eventpojos.PubSubMessage;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<PubSubMessage> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(PubSubMessage message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} message The Pub/Sub message.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (message, context) => {
  const name = message.data
    ? Buffer.from(message.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
         event. The `data` field contains the PubsubMessage message. The
         `attributes` field will contain custom attributes if there are any.
         context (google.cloud.functions.Context): The Cloud Functions event
         metadata. The `event_id` field contains the Pub/Sub message ID. The
         `timestamp` field contains the publish time.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {}
    """.format(context.event_id, context.timestamp))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Après avoir reçu une requête push, renvoyez un code d'état HTTP. Pour accuser réception du message, renvoyez l'un des codes d'état suivants :

  • 102
  • 200
  • 201
  • 202
  • 204

Pour envoyer un accusé de réception négatif pour le message, renvoyez n'importe quel autre code d'état. Si vous envoyez un accusé de réception négatif ou que le délai de confirmation expire, Pub/Sub le renvoie. Vous ne pouvez pas modifier le délai de confirmation des messages individuels que vous recevez des abonnements push.

Authentification et autorisation

Si un abonnement push utilise l'authentification, le service Pub/Sub signe un jeton Web JSON (JWT) et envoie le jeton JWT dans l'en-tête d'autorisation de la requête push. Le jeton JWT inclut des revendications et une signature.

Les abonnés peuvent décoder le jeton JWT et vérifier les points suivants :

  • Les revendications sont exactes.
  • Le service Pub/Sub a signé les revendications.

Si les abonnés utilisent un pare-feu, ils ne peuvent pas recevoir de requêtes push. Pour recevoir des requêtes push, vous devez désactiver le pare-feu et vérifier le jeton JWT.

Format JWT

Le jeton JWT est un jeton JWT OpenIDConnect composé d'un en-tête, d'un ensemble de revendications et d'une signature. Le service Pub/Sub encode le jeton JWT en tant que chaîne en base64 avec des délimiteurs de points.

Par exemple, l'en-tête d'autorisation suivant inclut un jeton JWT encodé :

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

L'en-tête et l'ensemble de revendications sont des chaînes JSON. Une fois décodés, ils prennent la forme suivante :

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

Les jetons associés aux requêtes envoyées aux points de terminaison push peuvent remonter jusqu'à une heure.

Configurer Pub/Sub pour l’authentification push

La configuration de l'authentification pour un abonnement comprend deux paramètres :

  • Compte de service : Le compte de service GCP associé à l'abonnement push. Les requêtes push conservent l'identité de ce compte de service. Par exemple, un abonnement push configuré avec un compte de service doté du rôle roles/run.invoker et lié à un service Cloud Run (entièrement géré) peut appeler ce service (entièrement géré).
  • Audience de jeton (facultatif) : Une chaîne unique, ne respectant pas la casse, qui peut être utilisée par le webhook pour valider l'audience visée par un jeton spécifique.

Outre la configuration de ces champs, vous devez également accorder à Pub/Sub les autorisations nécessaires pour créer des jetons pour votre compte de service. Cloud Pub/Sub crée et gère un compte de service spécial pour votre projet : service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.. Ce compte de service nécessite le rôle Créateur de jeton de compte de service. Si vous utilisez Cloud Console pour configurer l'abonnement pour l'authentification push, le rôle est attribué automatiquement. Sinon, vous devez explicitement attribuer le rôle au compte.

LIGNE DE COMMANDE

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

console

  1. Accédez à la page Sujets Pub/Sub.

    Accéder à la page Sujets

  2. Cliquez sur un nom de sujet.

  3. Créez ou mettez à jour un abonnement.

  4. Entrez une identité et (éventuellement) une audience.

Authentification et autorisation par le point de terminaison push

Revendications

Un jeton JWT peut être utilisé pour valider que les revendications (y compris les revendications par email et aud) sont signées par Google. Pour plus d'informations sur la manière dont les API OAuth 2.0 de Google peuvent être utilisées à la fois pour l'authentification et l'autorisation, consultez la documentation sur OpenID Connect.

Deux mécanismes rendent ces revendications significatives. Tout d'abord, Pub/Sub requiert que l'utilisateur ou le compte de service utilisé pour associer une identité de compte de service à un abonnement push possède le rôle d'utilisateur du compte de service (roles/iam.serviceAccountUser) pour le projet ou le compte de service.

Deuxièmement, l'accès aux certificats utilisés pour signer les jetons est étroitement contrôlé. Pour créer le jeton, Pub/Sub doit appeler un service Google interne en utilisant une identité de compte de service de signature distincte. Le compte de service de signature doit être autorisé à créer des jetons pour le compte de service revendiqué ou le projet contenant le compte. Cette opération est effectuée à l'aide de l'autorisation iam.serviceAccounts.getOpenIdToken ou d'un rôle de créateur de jetons de compte de service (roles/iam.serviceAccountTokenCreator).

Ce rôle ou cette autorisation peut être accordé à n’importe quel compte. Cependant, vous pouvez utiliser le service Cloud IAM pour vous assurer que le compte de signature Pub/Sub est celui qui dispose de cette autorisation. Plus précisément, Pub/Sub utilise un compte de service comme celui-ci :

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number} : Le projet GCP contenant l'abonnement.
  • gcp-sa-pubsub : Le projet appartenant à Google qui contient le compte de service de signature.

Valider des jetons

L'exemple suivant montre comment authentifier une requête push auprès d'une application App Engine.

Protocole

Requête :

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Réponse :

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Go

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Vous trouverez des exemples supplémentaires sur la validation du support JWT dans le guide de connexion à Google pour les sites Web. Une présentation plus générale des jetons OpenID est disponible dans le guide OpenID Connect.

Cloud Run et App Engine

Cloud Run et App Engine authentifient automatiquement les appels HTTP en vérifiant les jetons générés par Pub/Sub. La seule configuration requise de l'utilisateur est qu'il dispose des rôles Cloud IAM nécessaires attribués au compte de l'appelant. Par exemple, vous pouvez autoriser ou révoquer l'autorisation d'appeler un point de terminaison Cloud Run spécifique pour un compte. Pour en savoir plus, consultez les tutoriels suivants :

Arrêter et reprendre la distribution

Pour empêcher temporairement l'envoi de requêtes par Pub/Sub au point de terminaison push, passez l'abonnement en mode pull. Notez que plusieurs minutes peuvent être nécessaires pour que cette modification prenne effet.

Pour reprendre la distribution push, définissez à nouveau l'URL sur un point de terminaison valide. Pour arrêter définitivement la distribution, supprimez l'abonnement.

Quotas, limites et taux de distribution

Notez que les abonnements push sont soumis à un ensemble de quotas et de limites de ressources.

Si Pub/Sub ne reçoit pas de réponse success, il applique un intervalle exponentiel entre les tentatives de 100 millisecondes minimum et 60 secondes maximum.

Pub/Sub ajuste le nombre de requêtes push simultanées à l'aide d'un algorithme de démarrage progressif (Slow Start). Le nombre maximal autorisé de requêtes push simultanées correspond à la fenêtre push. La fenêtre push augmente pour toute distribution réussie et diminue pour tout échec. Le système commence par une petite fenêtre : 3 * NN correspond au nombre de régions de publication. La taille maximale de la fenêtre est de 3,000 * N. Le nombre réel de requêtes push simultanées ou en attente peut être surveillé à l'aide de la métrique Cloud Monitoring pubsub.googleapis.com/subscription/num_outstanding_messages.