Como usar assinaturas de push

O Pub/Sub é compatível com entrega de mensagens por push e pull. Para ter uma visão geral e comparar assinaturas de pull e push, consulte esta página. Neste documento, você verá uma descrição da entrega por push. Acesse o guia do assinante de pull para saber mais sobre a entrega por pull.

Se uma assinatura usar a entrega por push, o serviço do Pub/Sub entregará mensagens a um endpoint de push. O endpoint de push precisa ser um endereço HTTPS de acesso público. O servidor do endpoint de push precisa ter um certificado SSL válido assinado por uma autoridade de certificação.

O serviço do Pub/Sub entrega mensagens para enviar endpoints da mesma região do Google Cloud em que o serviço do Pub/Sub armazena as mensagens. O serviço Pub/Sub entrega mensagens da mesma região do Google Cloud com base no melhor esforço.

Além disso, as assinaturas de push podem ser configuradas para fornecer um cabeçalho de autorização para permitir que os endpoint autentiquem as solicitações. Os mecanismos de autenticação e autorização automáticos estão disponíveis para os endpoint padrão do App Engine e do Cloud Functions hospedados no mesmo projeto que a assinatura.

Receber mensagens

Quando o Pub/Sub entrega uma mensagem para um endpoint de push, o Pub/Sub a envia no corpo de uma solicitação POST. O corpo da solicitação é um objeto JSON e os dados da mensagem estão no campo message.data. Os dados da mensagem são codificados em base64.

O exemplo a seguir é o corpo de uma solicitação POST para um endpoint de push:

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "2070443601311540",
        "message_id": "2070443601311540",
        "publishTime": "2021-02-26T19:13:55.749Z",
        "publish_time": "2021-02-26T19:13:55.749Z",
    },
   "subscription": "projects/myproject/subscriptions/mysubscription"
}

Para receber mensagens de assinaturas de push, use um webhook e processe as solicitações POST que o Pub/Sub envia para o endpoint de push. Para mais informações sobre como processar essas solicitações POST no App Engine, consulte Como gravar e responder a mensagens do Pub/Sub.

Depois de receber uma solicitação por push, retorne um código de status HTTP. Para confirmar a mensagem, retorne um dos seguintes códigos de status:

  • 102
  • 200
  • 201
  • 202
  • 204

Para enviar uma confirmação negativa para a mensagem, retorne qualquer outro código de status. Se você enviar uma confirmação negativa ou o prazo de confirmação expirar, o Pub/Sub reenviará a mensagem. Não é possível modificar o prazo de confirmação de mensagens individuais recebidas de assinaturas de push.

Autenticação e autorização

Se uma assinatura de push usa a autenticação, o serviço Pub/Sub assina um JSON Web Token (JWT) e o envia no cabeçalho de autorização da solicitação de push. O JWT inclui declarações e uma assinatura.

Os assinantes podem decodificar o JWT e verificar o seguinte:

  • As declarações são precisas.
  • O serviço do Pub/Sub assinou as declarações.

Se os assinantes usam um firewall, eles não podem receber solicitações push. Para receber solicitações push, desative o firewall e verifique o JWT.

Formato JWT

O JWT é um JWT do OpenIDConnect que consiste em um cabeçalho, um conjunto de declarações e uma assinatura. O serviço Pub/Sub codifica o JWT como uma string base64 com delimitadores de período.

Por exemplo, o cabeçalho de autorização a seguir inclui um JWT codificado:

"Authorization" : "Bearer
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdkNjgwZDhjNzBkNDRlOTQ3MTMzY2JkNDk5ZWJjMWE2MWMzZDVh
YmMiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXpwIjoiMTEzNzc0M
jY0NDYzMDM4MzIxOTY0IiwiZW1haWwiOiJnYWUtZ2NwQGFwcHNwb3QuZ3NlcnZpY2VhY2NvdW50LmNvb
SIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJleHAiOjE1NTAxODU5MzUsImlhdCI6MTU1MDE4MjMzNSwia
XNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTEzNzc0MjY0NDYzMDM4MzIxO
TY0In0.QVjyqpmadTyDZmlX2u3jWd1kJ68YkdwsRZDo-QxSPbxjug4ucLBwAs2QePrcgZ6hhkvdc4UHY
4YF3fz9g7XHULNVIzX5xh02qXEH8dK6PgGndIWcZQzjSYfgO-q-R2oo2hNM5HBBsQN4ARtGK_acG-NGG
WM3CQfahbEjZPAJe_B8M7HfIu_G5jOLZCw2EUcGo8BvEwGcLWB2WqEgRM0-xt5-UPzoa3-FpSPG7DHk7
z9zRUeq6eB__ldb-2o4RciJmjVwHgnYqn3VvlX9oVKEgXpNFhKuYA-mWh5o7BCwhujSMmFoBOh6mbIXF
cyf5UiVqKjpqEbqPGo_AvKvIQ9VTQ" 

O cabeçalho e o conjunto de declarações são strings JSON. Depois de decodificadas, elas assumem o seguinte formato:

{"alg":"RS256","kid":"7d680d8c70d44e947133cbd499ebc1a61c3d5abc","typ":"JWT"}

{
   "aud":"https://example.com",
   "azp":"113774264463038321964",
   "email":"gae-gcp@appspot.gserviceaccount.com",
   "sub":"113774264463038321964",
   "email_verified":true,
   "exp":1550185935,
   "iat":1550182335,
   "iss":"https://accounts.google.com"
  }

Os tokens anexados às solicitações enviadas para os endpoints de push podem ter até uma hora.

Como configurar o Pub/Sub para autenticação por push

A configuração de autenticação para uma assinatura consiste em dois parâmetros:

  • Conta de serviço: a conta de serviço do GCP associada à assinatura de push. As solicitações push têm a identidade dessa conta de serviço. Como exemplo, uma assinatura de push configurada com uma conta de serviço que tem o papel roles/run.invoker e está vinculada a um determinado serviço do Cloud Run pode invocar esse serviço do Cloud Run.
  • Público-alvo de token (opcional): uma única string, indiferente a maiúsculas, que pode ser usada pelo webhook para validar o público-alvo desse token.

Além de configurar esses campos, também é preciso conceder ao Pub/Sub as permissões necessárias para criar tokens para sua conta de serviço. O Pub/Sub cria e mantém uma conta de serviço especial para seu projeto: service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com em que {PROJECT_NUMBER} é o projeto do GCP que contém a assinatura. Essa conta precisa do papel de criador do token da conta de serviço. Se você usa o Console do Cloud para configurar a assinatura para autenticação por push ou usa um projeto criado após 8 de abril de 2021, o papel é concedido automaticamente como parte do Pub/Sub papel de agente de serviço. Caso contrário, você precisará conceder explicitamente o papel à conta.

LINHA DE COMANDO

# Only for projects created on or before April 8, 2021:
# grant Cloud Pub/Sub the permission to create tokens
PUBSUB_SERVICE_ACCOUNT="service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
 --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}"\
 --role='roles/iam.serviceAccountTokenCreator'

# For all projects:
# configure the subscription push identity
gcloud pubsub subscriptions (create|update|modify-push-config) ${SUBSCRIPTION} \
 --topic=${TOPIC} \
 --push-endpoint=${PUSH_ENDPOINT_URI} \
 --push-auth-service-account=${SERVICE_ACCOUNT_EMAIL} \
 --push-auth-token-audience=${OPTIONAL_AUDIENCE_OVERRIDE}

Console

  1. Acesse a página Tópicos do Pub/Sub.

    Acessar a página de tópicos

  2. Clique em um tópico.

  3. Crie ou atualize uma assinatura.

  4. Digite uma identidade e (opcionalmente) um público-alvo.

Autenticação e autorização pelo endpoint de push

Declarações

O JWT pode ser usado para validar que as declarações, incluindo email e aud, sejam assinadas pelo Google. Para mais informações sobre como as APIs do OAuth 2.0 do Google podem ser usadas para autenticação e autorização, consulte OpenID Connect.

Dois mecanismos tornam essas declarações significativas. Primeiro, o Pub/Sub exige que a conta de serviço ou o usuário usado para associar uma identidade de conta de serviço a uma assinatura de push tenha um papel (roles/iam.serviceAccountUser) Usuário da conta de serviço para o projeto ou a conta de serviço.

Em segundo lugar, o acesso aos certificados utilizados para assinar os tokens é rigidamente controlado. Para criar o token, o Pub/Sub precisa chamar um serviço interno do Google usando uma identidade de conta de serviço de assinatura diferente. É necessário que a conta de serviço de assinatura seja autorizada a criar tokens para a conta de serviço declarada ou para o projeto que contém a conta. Isso é feito usando a permissão iam.serviceAccounts.getOpenIdToken ou um papel (roles/iam.serviceAccountTokenCreator) de Criador de token de conta de serviço.

Como validar tokens

A validação de tokens enviados pelo Pub/Sub para o endpoint de push envolve:

  • Como verificar a integridade do token por meio da validação de assinaturas.
  • Garantir que as declarações email e audience no token correspondam aos valores definidos na configuração da assinatura de push.

O exemplo a seguir mostra como autenticar uma solicitação por push em um aplicativo do App Engine.

protocolo

Solicitação:

GET https://oauth2.googleapis.com/tokeninfo?id_token={BEARER_TOKEN}

Saída:

200 OK
{
    "alg": "RS256",
    "aud": "example.com",
    "azp": "104176025330667568672",
    "email": "{SERVICE_ACCOUNT_NAME}@{YOUR_PROJECT_NAME}.iam.gserviceaccount.com",
    "email_verified": "true",
    "exp": "1555463097",
    "iat": "1555459497",
    "iss": "https://accounts.google.com",
    "kid": "3782d3f0bc89008d9d2c01730f765cfb19d3b70e",
    "sub": "104176025330667568672",
    "typ": "JWT"
}

Go

// receiveMessagesHandler validates authentication token and caches the Pub/Sub
// message received.
func (a *app) receiveMessagesHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != "POST" {
		http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
		return
	}

	// Verify that the request originates from the application.
	// a.pubsubVerificationToken = os.Getenv("PUBSUB_VERIFICATION_TOKEN")
	if token, ok := r.URL.Query()["token"]; !ok || len(token) != 1 || token[0] != a.pubsubVerificationToken {
		http.Error(w, "Bad token", http.StatusBadRequest)
		return
	}

	// Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" || len(strings.Split(authHeader, " ")) != 2 {
		http.Error(w, "Missing Authorization header", http.StatusBadRequest)
		return
	}
	token := strings.Split(authHeader, " ")[1]
	// Verify and decode the JWT.
	// If you don't need to control the HTTP client used you can use the
	// convenience method idtoken.Validate instead of creating a Validator.
	v, err := idtoken.NewValidator(r.Context(), option.WithHTTPClient(a.defaultHTTPClient))
	if err != nil {
		http.Error(w, "Unable to create Validator", http.StatusBadRequest)
		return
	}
	// Please change http://example.com to match with the value you are
	// providing while creating the subscription.
	payload, err := v.Validate(r.Context(), token, "http://example.com")
	if err != nil {
		http.Error(w, fmt.Sprintf("Invalid Token: %v", err), http.StatusBadRequest)
		return
	}
	if payload.Issuer != "accounts.google.com" && payload.Issuer != "https://accounts.google.com" {
		http.Error(w, "Wrong Issuer", http.StatusBadRequest)
		return
	}

	// IMPORTANT: you should validate claim details not covered by signature
	// and audience verification above, including:
	//   - Ensure that `payload.Claims["email"]` is equal to the expected service
	//     account set up in the push subscription settings.
	//   - Ensure that `payload.Claims["email_verified"]` is set to true.
	if payload.Claims["email"] != "test-service-account-email@example.com" || payload.Claims["email_verified"] != true {
		http.Error(w, "Unexpected email identity", http.StatusBadRequest)
		return
	}

	var pr pushRequest
	if err := json.NewDecoder(r.Body).Decode(&pr); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	a.messagesMu.Lock()
	defer a.messagesMu.Unlock()
	// Limit to ten.
	a.messages = append(a.messages, pr.Message.Data)
	if len(a.messages) > maxMessages {
		a.messages = a.messages[len(a.messages)-maxMessages:]
	}

	fmt.Fprint(w, "OK")
}

Java

@WebServlet(value = "/pubsub/authenticated-push")
public class PubSubAuthenticatedPush extends HttpServlet {
  private final String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
  private final MessageRepository messageRepository;
  private final GoogleIdTokenVerifier verifier =
      new GoogleIdTokenVerifier.Builder(new NetHttpTransport(), new JacksonFactory())
          /**
           * Please change example.com to match with value you are providing while creating
           * subscription as provided in @see <a
           * href="https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/appengine-java8/pubsub">README</a>.
           */
          .setAudience(Collections.singletonList("example.com"))
          .build();
  private final Gson gson = new Gson();
  private final JsonParser jsonParser = new JsonParser();

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp)
      throws IOException, ServletException {

    // Verify that the request originates from the application.
    if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    String authorizationHeader = req.getHeader("Authorization");
    if (authorizationHeader == null
        || authorizationHeader.isEmpty()
        || authorizationHeader.split(" ").length != 2) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
      return;
    }
    String authorization = authorizationHeader.split(" ")[1];

    try {
      // Verify and decode the JWT.
      // Note: For high volume push requests, it would save some network overhead
      // if you verify the tokens offline by decoding them using Google's Public
      // Cert; caching already seen tokens works best when a large volume of
      // messsages have prompted a singple push server to handle them, in which
      // case they would all share the same token for a limited time window.
      GoogleIdToken idToken = verifier.verify(authorization);

      GoogleIdToken.Payload payload = idToken.getPayload();
      // IMPORTANT: you should validate claim details not covered by signature
      // and audience verification above, including:
      //   - Ensure that `payload.getEmail()` is equal to the expected service
      //     account set up in the push subscription settings.
      //   - Ensure that `payload.getEmailVerified()` is set to true.

      messageRepository.saveToken(authorization);
      messageRepository.saveClaim(payload.toPrettyString());
      // parse message object from "message" field in the request body json
      // decode message data from base64
      Message message = getMessage(req);
      messageRepository.save(message);
      // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
      resp.setStatus(102);
      super.doPost(req, resp);
    } catch (Exception e) {
      resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
    }
  }

  private Message getMessage(HttpServletRequest request) throws IOException {
    String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
    JsonElement jsonRoot = jsonParser.parse(requestBody);
    String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
    Message message = gson.fromJson(messageStr, Message.class);
    // decode from base64
    String decoded = decode(message.getData());
    message.setData(decoded);
    return message;
  }

  private String decode(String data) {
    return new String(Base64.getDecoder().decode(data));
  }

  PubSubAuthenticatedPush(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
  }

  public PubSubAuthenticatedPush() {
    this(MessageRepositoryImpl.getInstance());
  }
}

Node.js

app.post('/pubsub/authenticated-push', jsonBodyParser, async (req, res) => {
  // Verify that the request originates from the application.
  if (req.query.token !== PUBSUB_VERIFICATION_TOKEN) {
    res.status(400).send('Invalid request');
    return;
  }

  // Verify that the push request originates from Cloud Pub/Sub.
  try {
    // Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
    const bearer = req.header('Authorization');
    const [, token] = bearer.match(/Bearer (.*)/);
    tokens.push(token);

    // Verify and decode the JWT.
    // Note: For high volume push requests, it would save some network
    // overhead if you verify the tokens offline by decoding them using
    // Google's Public Cert; caching already seen tokens works best when
    // a large volume of messages have prompted a single push server to
    // handle them, in which case they would all share the same token for
    // a limited time window.
    const ticket = await authClient.verifyIdToken({
      idToken: token,
      audience: 'example.com',
    });

    const claim = ticket.getPayload();

    // IMPORTANT: you should validate claim details not covered
    // by signature and audience verification above, including:
    //   - Ensure that `claim.email` is equal to the expected service
    //     account set up in the push subscription settings.
    //   - Ensure that `claim.email_verified` is set to true.

    claims.push(claim);
  } catch (e) {
    res.status(400).send('Invalid token');
    return;
  }

  // The message is a unicode string encoded in base64.
  const message = Buffer.from(req.body.message.data, 'base64').toString(
    'utf-8'
  );

  messages.push(message);

  res.status(200).send();
});

Python

@app.route('/push-handlers/receive_messages', methods=['POST'])
def receive_messages_handler():
    # Verify that the request originates from the application.
    if (request.args.get('token', '') !=
            current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    # Verify that the push request originates from Cloud Pub/Sub.
    try:
        # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
        bearer_token = request.headers.get('Authorization')
        token = bearer_token.split(' ')[1]
        TOKENS.append(token)

        # Verify and decode the JWT. `verify_oauth2_token` verifies
        # the JWT signature, the `aud` claim, and the `exp` claim.
        # Note: For high volume push requests, it would save some network
        # overhead if you verify the tokens offline by downloading Google's
        # Public Cert and decode them using the `google.auth.jwt` module;
        # caching already seen tokens works best when a large volume of
        # messages have prompted a single push server to handle them, in which
        # case they would all share the same token for a limited time window.
        claim = id_token.verify_oauth2_token(token, requests.Request(),
                                             audience='example.com')

        # IMPORTANT: you should validate claim details not covered by signature
        # and audience verification above, including:
        #   - Ensure that `claim["email"]` is equal to the expected service
        #     account set up in the push subscription settings.
        #   - Ensure that `claim["email_verified"]` is set to true.

        CLAIMS.append(claim)
    except Exception as e:
        return 'Invalid token: {}\n'.format(e), 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])
    MESSAGES.append(payload)
    # Returning any 2xx status indicates successful receipt of the message.
    return 'OK', 200

Ruby

post "/pubsub/authenticated-push" do
  halt 400 if params[:token] != PUBSUB_VERIFICATION_TOKEN

  begin
    bearer = request.env["HTTP_AUTHORIZATION"]
    token = /Bearer (.*)/.match(bearer)[1]
    claim = Google::Auth::IDTokens.verify_oidc token, aud: "example.com"
    claims.push claim
  rescue Google::Auth::IDTokens::VerificationError => e
    puts "VerificationError: #{e.message}"
    halt 400, "Invalid token"
  end

  message = JSON.parse request.body.read
  payload = Base64.decode64 message["message"]["data"]

  messages.push payload
end

Consulte o guia do Login do Google para sites e veja outros exemplos sobre como validar o JWT do portador. Para mais informações sobre os tokens do OpenID, acesse o guia do OpenID Connect, que inclui uma lista das bibliotecas de cliente que ajudam a validar os JWTs.

Cloud Run e App Engine

O Cloud Run e o App Engine autenticam automaticamente as chamadas HTTP verificando tokens gerados pelo Pub/Sub. A única configuração necessária do usuário é que os papéis necessários do IAM sejam concedidos à conta do autor da chamada. É possível, por exemplo, conceder ou revogar a permissão para chamar um determinado endpoint do Cloud Run para uma conta. Para detalhes, consulte os tutoriais a seguir:

Como interromper e retomar a entrega

Para interromper temporariamente o envio de solicitações do Pub/Sub para o endpoint de push, altere a assinatura para pull. Pode levar vários minutos para que essa mudança entre em vigor.

Para retomar a entrega por push, defina o URL para um endpoint válido novamente. Caso queira interromper a entrega permanentemente, exclua a assinatura.

Cotas, limites e taxa de entrega

As assinaturas de push estão sujeitas a um conjunto de cotas e limites de recursos.

Espera de push

Se um assinante de push envia confirmações negativas, o Pub/Sub pode entregar mensagens usando uma espera de push. Quando o Pub/Sub usa uma espera de push, ele interrompe a entrega de mensagens por um período de 100 milissegundos a 60 segundos e depois começa a entregar mensagens novamente.

A espera de push é uma espera exponencial que impede que um assinante de push receba mensagens que não pode processar. A quantidade de tempo que o Pub/Sub para de entregar mensagens depende do número de confirmações negativas enviadas aos assinantes.

Por exemplo, se um assinante de push receber cinco mensagens por segundo e enviar uma confirmação negativa por segundo, o Pub/Sub entregará mensagens aproximadamente a cada 500 milissegundos. Se o assinante de push enviar cinco confirmações negativas por segundo, o Pub/Sub entregará mensagens a cada 30 a 60 segundos.

Taxa de envio

O Pub/Sub ajusta o número de solicitações push simultâneas usando um algoritmo de inicialização lenta. O número máximo permitido de solicitações push simultâneas é a janela push. A janela push aumenta em qualquer entrega bem-sucedida e diminui em caso de falha. O sistema começa com uma pequena janela: 3 vezes N, em que N é o número de regiões de publicação.

Quando um assinante reconhece as mensagens, a janela aumenta exponencialmente até 3.000 vezes N mensagens pendentes. Para assinaturas em que os assinantes confirmam mais de 99% das mensagens e conseguem, em média, menos de um segundo de latência da solicitação de push, a janela aumenta até 30.000 vezes N mensagens pendentes.

A latência da solicitação de push inclui o seguinte:

Depois de 3.000 mensagens pendentes, a janela aumenta linearmente para evitar que o endpoint de push receba muitas mensagens. Se a latência média exceder um segundo ou o assinante confirmar menos de 99% das solicitações, a janela diminuirá para o limite mínimo de 3.000 mensagens pendentes.

Para mais informações sobre as métricas que podem ser usadas para monitorar a entrega por push, consulte Como monitorar assinaturas de push.