Como usar assinaturas de push

O Pub/Sub é compatível com entrega de mensagens por push e pull. Para ter uma visão geral e comparar assinaturas de pull e push, consulte esta página. Neste documento, você verá uma descrição da entrega por push. Acesse o guia do assinante de pull para saber mais sobre a entrega por pull.

Se uma assinatura usar a entrega por push, o serviço do Pub/Sub entregará mensagens a um endpoint de push. O endpoint de push precisa ser um endereço HTTPS de acesso público. O servidor do endpoint de push precisa ter um certificado SSL válido assinado por uma autoridade de certificação.

O serviço do Pub/Sub entrega mensagens para enviar endpoints da mesma região do Google Cloud em que o serviço do Pub/Sub armazena as mensagens. O serviço Pub/Sub entrega mensagens da mesma região do Google Cloud com base no melhor esforço.

Além disso, as assinaturas de push podem ser configuradas para fornecer um cabeçalho de autorização para permitir que os endpoint autentiquem as solicitações. Os mecanismos de autenticação e autorização automáticos estão disponíveis para os endpoint padrão do App Engine e do Cloud Functions hospedados no mesmo projeto que a assinatura.

Receber mensagens

Quando o Pub/Sub entrega uma mensagem para um endpoint de push, o Pub/Sub a envia no corpo de uma solicitação POST. O corpo da solicitação é um objeto JSON e os dados da mensagem estão no campo message.data. Os dados da mensagem são codificados em base64.

O exemplo a seguir é o corpo de uma solicitação POST para um endpoint de push:

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "136969346945"
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

Para receber mensagens de assinaturas de push, use um webhook e processe as solicitações POST que o Pub/Sub envia para o endpoint de push. Por exemplo, as Funções do Cloud a seguir são webhooks que decodificam e imprimem dados de mensagens:

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data) // Automatically decoded from base64.
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java

Ver no GitHub (em inglês) Feedback

import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import functions.eventpojos.PubSubMessage;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<PubSubMessage> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(PubSubMessage message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} message The Pub/Sub message.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (message, context) => {
  const name = message.data
    ? Buffer.from(message.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

Ver no GitHub (em inglês) Feedback
def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
         event. The `data` field contains the PubsubMessage message. The
         `attributes` field will contain custom attributes if there are any.
         context (google.cloud.functions.Context): The Cloud Functions event
         metadata. The `event_id` field contains the Pub/Sub message ID. The
         `timestamp` field contains the publish time.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {}
    """.format(context.event_id, context.timestamp))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Depois de receber uma solicitação por push, retorne um código de status HTTP. Para confirmar a mensagem, retorne um dos seguintes códigos de status:

  • 102
  • 200
  • 201
  • 202
  • 204

Para enviar uma confirmação negativa para a mensagem, retorne qualquer outro código de status. Se você enviar uma confirmação negativa ou o prazo de confirmação expirar, o Pub/Sub reenviará a mensagem. Não é possível modificar o prazo de confirmação de mensagens individuais recebidas de assinaturas de push.

Autenticação e autorização

Se uma assinatura de push usa a autenticação, o serviço Pub/Sub assina um JSON Web Token (JWT) e o envia no cabeçalho de autorização da solicitação de push. O JWT inclui declarações e uma assinatura.

Os assinantes podem decodificar o JWT e verificar o seguinte:

  • As declarações são precisas.
  • O serviço do Pub/Sub assinou as declarações.

Se os assinantes usam um firewall, eles não podem receber solicitações push. Para receber solicitações push, desative o firewall e verifique o JWT.

Formato JWT

O JWT é um JWT do OpenIDConnect que consiste em um cabeçalho, um conjunto de declarações e uma assinatura. O serviço Pub/Sub codifica o JWT como uma string base64 com delimitadores de período.

Por exemplo, o cabeçalho de autorização a seguir inclui um JWT codificado:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

O cabeçalho e o conjunto de declarações são strings JSON. Depois de decodificadas, elas assumem o seguinte formato:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

Os tokens anexados às solicitações enviadas para os endpoints de push podem ter até uma hora.

Como configurar o Pub/Sub para autenticação por push

A configuração de autenticação para uma assinatura consiste em dois parâmetros:

  • Conta de serviço: a conta de serviço do GCP associada à assinatura de push. As solicitações push têm a identidade dessa conta de serviço. Como exemplo, uma assinatura de push configurada com uma conta de serviço que tem o papel roles/run.invoker e está vinculada a um determinado serviço do Cloud Run (totalmente gerenciado) que pode invocar esse serviço do Cloud Run.
  • Público-alvo de token (opcional): uma única string, indiferente a maiúsculas, que pode ser usada pelo webhook para validar o público-alvo desse token.

Além de configurar esses campos, também é preciso conceder ao Pub/Sub as permissões necessárias para criar tokens para sua conta de serviço. O Pub/Sub cria e mantém uma conta de serviço especial para seu projeto: service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com.. Essa conta de serviço precisa do papel Criador de token de conta de serviço. Se você usar o Console do Cloud para configurar a assinatura para autenticação por push, o papel será concedido automaticamente. Caso contrário, será preciso conceder explicitamente o papel à conta.

LINHA DE COMANDO

# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

Console

  1. Acesse a página Tópicos do Pub/Sub.

    Acessar a página de tópicos

  2. Clique em um tópico.

  3. Crie ou atualize uma assinatura.

  4. Digite uma identidade e (opcionalmente) um público-alvo.

Autenticação e autorização pelo endpoint de push

Declarações

O JWT pode ser usado para validar que as declarações, incluindo email e aud, sejam assinadas pelo Google. Para mais informações sobre como as APIs do OAuth 2.0 do Google podem ser usadas para autenticação e autorização, consulte OpenID Connect.

Dois mecanismos tornam essas declarações significativas. Primeiro, o Pub/Sub exige que a conta de serviço ou o usuário usado para associar uma identidade de conta de serviço a uma assinatura de push tenha um papel (roles/iam.serviceAccountUser) Usuário da conta de serviço para o projeto ou a conta de serviço.

Em segundo lugar, o acesso aos certificados utilizados para assinar os tokens é rigidamente controlado. Para criar o token, o Pub/Sub precisa chamar um serviço interno do Google usando uma identidade de conta de serviço de assinatura diferente. É necessário que a conta de serviço de assinatura seja autorizada a criar tokens para a conta de serviço declarada ou para o projeto que contém a conta. Isso é feito usando a permissão iam.serviceAccounts.getOpenIdToken ou um papel (roles/iam.serviceAccountTokenCreator) de Criador de token de conta de serviço.

Esse papel ou permissão pode ser concedido a qualquer conta. No entanto, é possível usar o serviço de IAM para garantir que a conta de assinatura do Pub/Sub seja a única com essa permissão. Especificamente, o Pub/Sub usa uma conta de serviço como esta:

service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com
  • {project_number}: o projeto do GCP que contém a assinatura
  • gcp-sa-pubsub: o projeto do Google que contém a conta de serviço de assinatura

Como validar tokens

O exemplo a seguir mostra como autenticar uma solicitação por push em um aplicativo do App Engine.

protocolo

Solicitação:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Resposta:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Java

Ver no GitHub (em inglês) Feedback
@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);
      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(idToken.getPayload().toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();
    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')
        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Go

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Consulte o guia do Login do Google para sites e veja outros exemplos sobre como validar o JWT do portador. Para mais informações sobre os tokens do OpenID, acesse o guia do OpenID Connect.

Cloud Run e App Engine

O Cloud Run e o App Engine autenticam automaticamente as chamadas HTTP verificando tokens gerados pelo Pub/Sub. A única configuração necessária do usuário é que os papéis necessários do IAM sejam concedidos à conta do autor da chamada. É possível, por exemplo, conceder ou revogar a permissão para chamar um determinado endpoint do Cloud Run para uma conta. Para detalhes, consulte os tutoriais a seguir:

Como interromper e retomar a entrega

Para interromper temporariamente o envio de solicitações do Pub/Sub para o endpoint de push, altere a assinatura para pull. Pode levar vários minutos para que essa mudança entre em vigor.

Para retomar a entrega por push, defina o URL para um endpoint válido novamente. Caso queira interromper a entrega permanentemente, exclua a assinatura.

Cotas, limites e taxa de envio

As assinaturas de push estão sujeitas a um conjunto de cotas e limites de recursos.

Se o Pub/Sub não receber uma resposta success, ele aplicará espera exponencial usando no mínimo 100 milissegundos e no máximo 60 segundos.

O Pub/Sub ajusta o número de solicitações push simultâneas usando um algoritmo de inicialização lenta. O número máximo permitido de solicitações push simultâneas é a janela push. A janela push aumenta em qualquer entrega bem-sucedida e diminui em caso de falha. O sistema começa com uma pequena janela: 3 vezes N, em que N é o número de regiões de publicação.

Quando um assinante reconhece as mensagens, a janela aumenta exponencialmente até 3.000 vezes N mensagens pendentes. Para assinaturas em que os assinantes confirmam mais de 99% das mensagens e conseguem, em média, menos de um segundo de latência da solicitação de push, a janela aumenta até 30.000 vezes N mensagens pendentes.

A latência da solicitação de push inclui o seguinte:

Depois de 3.000 mensagens pendentes, a janela aumenta linearmente para evitar que o endpoint de push receba muitas mensagens. Se a latência média exceder um segundo ou o assinante confirmar menos de 99% das solicitações, a janela diminuirá para o limite mínimo de 3.000 mensagens pendentes.

Para mais informações sobre as métricas que podem ser usadas para monitorar a entrega por push, consulte Como monitorar assinaturas de push.