Como gravar e responder mensagens do Pub/Sub

ID da região

O REGION_ID é um código abreviado que o Google atribui com base na região que você selecionou ao criar o aplicativo. O código não corresponde a um país ou estado, ainda que alguns IDs de região sejam semelhantes aos códigos de país e estado geralmente usados. Para apps criados após fevereiro de 2020, o REGION_ID.r está incluído nos URLs do App Engine. Para apps existentes criados antes dessa data, o ID da região é opcional no URL.

Saiba mais sobre IDs de região.

O Pub/Sub mensagens assíncronas confiáveis de muitos para muitos entre os aplicativos. Os aplicativos do editor podem enviar mensagens para um tópico e outros aplicativos podem assinar esse tópico para receber as mensagens.

Neste documento, descrevemos como usar a biblioteca de cliente do Cloud para enviar e receber mensagens do Pub/Sub em um aplicativo Python 3.

Pré-requisitos

  • Siga as instruções em "Hello World" para Python 3 no App Engine para configurar seu ambiente e projeto, assim como entender como os aplicativos Python 3 do App Engine estão estruturados.
  • Anote e guarde o ID do projeto. Você precisará dele para executar o aplicativo de amostra descrito neste documento.

    Como clonar o app de amostra

    Copie os aplicativos de amostra para sua máquina local e navegue até o diretório pubsub:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples
    cd python-docs-samples/appengine/standard_python3/pubsub
    

    Como criar um tópico e uma assinatura

    Crie um tópico e uma assinatura. Isso inclui especificar o endpoint que receberá as solicitações do Pub/Sub:

    gcloud pubsub topics create YOUR_TOPIC_NAME
    gcloud pubsub subscriptions create YOUR_SUBSCRIPTION_NAME \
        --topic YOUR_TOPIC_NAME \
        --push-endpoint \
        https://YOUR_PROJECT_ID.REGION_ID.r.appspot.com/pubsub/push?token=YOUR_TOKEN \
        --ack-deadline 10
    

    Substitua YOUR_TOKEN por um token aleatório secreto. Ele é usado pelo endpoint do push para verificar as solicitações.

    Como definir variáveis de ambiente

    Edite o arquivo app.yaml para definir as variáveis de ambiente para o ID do projeto, tópico e token de verificação:

    env_variables:
      PUBSUB_TOPIC: '<YOUR_TOPIC>'
      # This token is used to verify that requests originate from your
      # application. It can be any sufficiently random string.
      PUBSUB_VERIFICATION_TOKEN: '<YOUR_VERIFICATION_TOKEN>'

    Revisão de código

    O aplicativo de amostra usa as bibliotecas de cliente do Google Cloud.

    O aplicativo de amostra usa os valores definidos no arquivo app.yaml para configurar variáveis de ambiente. O gerenciador da solicitação de push usa esses valores para confirmar que a solicitação veio do Pub/Sub e foi originada de uma fonte confiável:

    app.config['PUBSUB_VERIFICATION_TOKEN'] = \
        os.environ['PUBSUB_VERIFICATION_TOKEN']
    app.config['PUBSUB_TOPIC'] = os.environ['PUBSUB_TOPIC']
    

    O aplicativo de amostra mantém uma lista global para armazenar as mensagens recebidas por instância:

    MESSAGES = []
    
    Este método receive_messages_handler() recebe mensagens enviadas e as adiciona à lista global MESSAGES:

    @app.route('/push-handlers/receive_messages', methods=['POST'])
    def receive_messages_handler():
        # Verify that the request originates from the application.
        if (request.args.get('token', '') !=
                current_app.config['PUBSUB_VERIFICATION_TOKEN']):
            return 'Invalid request', 400
    
        # Verify that the push request originates from Cloud Pub/Sub.
        try:
            # Get the Cloud Pub/Sub-generated JWT in the "Authorization" header.
            bearer_token = request.headers.get('Authorization')
            token = bearer_token.split(' ')[1]
            TOKENS.append(token)
    
            # Verify and decode the JWT. `verify_oauth2_token` verifies
            # the JWT signature, the `aud` claim, and the `exp` claim.
            # Note: For high volume push requests, it would save some network
            # overhead if you verify the tokens offline by downloading Google's
            # Public Cert and decode them using the `google.auth.jwt` module;
            # caching already seen tokens works best when a large volume of
            # messages have prompted a single push server to handle them, in which
            # case they would all share the same token for a limited time window.
            claim = id_token.verify_oauth2_token(token, requests.Request(),
                                                 audience='example.com')
    
            # IMPORTANT: you should validate claim details not covered by signature
            # and audience verification above, including:
            #   - Ensure that `claim["email"]` is equal to the expected service
            #     account set up in the push subscription settings.
            #   - Ensure that `claim["email_verified"]` is set to true.
    
            CLAIMS.append(claim)
        except Exception as e:
            return 'Invalid token: {}\n'.format(e), 400
    
        envelope = json.loads(request.data.decode('utf-8'))
        payload = base64.b64decode(envelope['message']['data'])
        MESSAGES.append(payload)
        # Returning any 2xx status indicates successful receipt of the message.
        return 'OK', 200

    O método index() interage com o aplicativo da Web do App Engine para publicar novas mensagens e exibir mensagens recebidas:

    @app.route('/', methods=['GET', 'POST'])
    def index():
        if request.method == 'GET':
            return render_template('index.html', messages=MESSAGES, tokens=TOKENS,
                                   claims=CLAIMS)
    
        data = request.form.get('payload', 'Example payload').encode('utf-8')
    
        # Consider initializing the publisher client outside this function
        # for better latency performance.
        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(app.config['GOOGLE_CLOUD_PROJECT'],
                                          app.config['PUBSUB_TOPIC'])
        future = publisher.publish(topic_path, data)
        future.result()
        return 'OK', 200

    Executar a amostra no local

    Com a Google Cloud CLI em execução no local, é possível fornecer autenticação para usar as APIs do Google Cloud. Se tiver configurado o ambiente conforme descrito nos Pré-requisitos, você já terá executado o comando gcloud init, que fornece essa autenticação.

    Instale as dependências preferencialmente em um ambiente virtual.

    Mac OS/Linux

    1. Crie um ambiente Python isolado:
      python3 -m venv env
      source env/bin/activate
    2. Se você não estiver no diretório que contém o código de amostra, navegue até o diretório que contém o código de amostra hello_world. Em seguida, instale as dependências:
      cd YOUR_SAMPLE_CODE_DIR
      pip install -r requirements.txt

    Windows

    Use o PowerShell para executar os pacotes Python.

    1. Localize a instalação do PowerShell.
    2. Clique com o botão direito do mouse no atalho do PowerShell e inicie-o como administrador.
    3. Crie um ambiente Python isolado.
      python -m venv env
      .\env\Scripts\activate
    4. Acesse o diretório do projeto e instale as dependências: Se você não estiver no diretório que contém o código de amostra, navegue até o diretório que contém o código de amostra hello_world. Em seguida, instale as dependências:
      cd YOUR_SAMPLE_CODE_DIR
      pip install -r requirements.txt

    Defina as variáveis de ambiente antes de iniciar o aplicativo:

    export GOOGLE_CLOUD_PROJECT=[your-project-id]
    export PUBSUB_VERIFICATION_TOKEN=[your-verification-token]
    export PUBSUB_TOPIC=[your-topic]
    python main.py
    

    Como simular notificações push

    Pelo aplicativo, é possível enviar mensagens localmente, mas não é possível receber mensagens push localmente. É possível simular uma mensagem de push com uma solicitação HTTP para o notification endpoint de push local. A amostra inclui o sample_message.json do arquivo.

    É possível usar curl ou um cliente httpie para enviar uma solicitação POST HTTP:

    curl -i --data @sample_message.json "localhost:8080/push-handlers/receive_messages?token=[your-token]"
    

    Ou

    http POST ":8080/push-handlers/receive_messages?token=[your-token]" < sample_message.json
    

    Resposta:

    HTTP/1.0 200 OK
    Content-Length: 2
    Content-Type: text/html; charset=utf-8
    Date: Mon, 10 Aug 2015 17:52:03 GMT
    Server: Werkzeug/0.10.4 Python/2.7.10
    
    OK
    

    Após a conclusão da solicitação, é possível atualizar localhost:8080 e ver a mensagem na lista de mensagens recebidas.

    Como executar no App Engine

    Para implantar o aplicativo de demonstração no App Engine usando a ferramenta de linha de comando gcloud, execute o comando a seguir no diretório em que seu arquivo app.yaml está localizado:

    gcloud app deploy
    

    Agora, é possível acessar o aplicativo em https://PROJECT_ID.REGION_ID.r.appspot.com. Use o formulário para enviar mensagens, mas não há garantias de qual instância do seu aplicativo receberá a notificação. É possível enviar várias mensagens e atualizar a página para ver a mensagem recebida.