Como usar o Cloud Pub/Sub com o .NET

Muitos aplicativos executam processos em segundo plano fora de uma solicitação da Web. Neste exemplo, o aplicativo Bookshelf envia tarefas para um worker em segundo plano separado para execução. O worker coleta as informações da API Google Books e atualiza as informações do livro no banco de dados. Veja neste exemplo como configurar serviços separados no App Engine, como executar o processamento de um worker no ambiente flexível do App Engine e como lidar com eventos de ciclo de vida.

Esta página é parte de um tutorial com várias páginas. Para ver do início e ler as instruções de configuração, consulte o artigo App Bookshelf em .NET.

Como definir configurações

  1. No diretório getting-started-dotnet\aspnet\5-pubsub, clique duas vezes em 5-pubsub para abrir o aplicativo de amostra no Visual Studio.

  2. No painel Gerenciador de soluções, clique em Bookshelf > Web.config.

  3. Em bookshelf\Web.config, conclua as etapas a seguir:

    1. Defina GoogleCloudSamples:ProjectId como código do projeto.

    2. Defina GoogleCloudSamples:BookStore com o mesmo valor usado na etapa Como usar dados estruturados deste tutorial.

    3. Se você usou o Cloud SQL ou o SQL Server na etapa de dados estruturados, localize o elemento XML <connectionStrings> e defina connectionString com o mesmo valor usado naquela etapa.

    4. Defina GoogleCloudSamples:BucketName como o nome do intervalo do Cloud Storage criado anteriormente.

  4. Salve e feche bookshelf\Web.config.

  5. No painel Gerenciador de soluções, vá para Trabalhador > Web.config.

  6. Em worker\Web.config, conclua as etapas a seguir:

    1. Defina GoogleCloudSamples:ProjectId como código do projeto.

    2. Defina o valor de GoogleCloudSamples:BookStore com o mesmo valor usado na etapa Como usar dados estruturados deste tutorial.

    3. Se você usou o Cloud SQL ou o SQL Server na etapa de dados estruturados, localize o elemento XML <connectionStrings> e defina connectionString com o mesmo valor usado naquela etapa.

    4. Defina GoogleCloudSamples:BucketName como o nome do intervalo do Cloud Storage criado anteriormente.

  7. Salve e feche worker\Web.config.

Como executar o aplicativo na máquina local

  1. No painel Gerenciador de soluções do Visual Studio, clique com o botão direito do mouse em Solução e escolha Definir projetos de inicialização.

    Definir projetos de inicialização

  2. Clique em Vários projetos de inicialização.

  3. Para as linhas Bookshelf e Trabalhador, defina a Ação como Iniciar e, em seguida, clique em OK.

    Definir bookshelf e worker como Iniciar

  4. Pressione F5 para executar os projetos.

  5. Adicione alguns livros ao Bookshelf. Se a instância do aplicativo e do worker estiverem em execução localmente, será possível observar o worker atualizando as informações do livro em segundo plano.

Como implantar o app Bookshelf no Compute Engine

  1. No painel Gerenciador de soluções do Visual Studio, clique com o botão direito do mouse em Bookshelf e depois selecione Publicar.

    Publicar app

  2. Crie um novo perfil personalizado, como você fez na parte Como usar o Cloud Datastore deste tutorial.

  3. Clique em Publicar.

Como implantar o worker no Compute Engine

  1. No painel Gerenciador de soluções do Visual Studio, clique com o botão direito do mouse em Trabalhador e selecione Publicar.

  2. Crie um novo perfil personalizado, como você fez na parte Como usar o Cloud Datastore deste tutorial.

  3. Clique em Publicar.

Como executar o app no Compute Engine

No navegador da Web, digite o endereço da primeira instância do Compute Engine.

Estrutura do app

Este diagrama mostra os componentes do app e como se encaixam.

Exemplo de estrutura de Auth

Noções básicas sobre o código

Nesta seção, mostramos como criar uma fila, adicionar tarefas à fila e usar o worker para processar tarefas.

Criar uma fila

Juntos, um tópico e uma assinatura do Cloud Pub/Sub formam uma fila. Diagrama de um tópico e uma assinatura formando uma fila

Uma QueueMessage contém o código de um livro para pesquisar na API do Google Livros.

private class QueueMessage
{
    public long BookId;
};

Um código de livro é adicionado a um tópico chamado book-process-queue. Uma assinatura denominada shared-worker-subscription faz a inscrição nesse tópico. O worker analisa essa assinatura para as tarefas a serem executadas.

Os caminhos completos do tópico e da assinatura incluem o nome do projeto.

_topicName = new TopicName(projectId, options.TopicId);
_subscriptionName = new SubscriptionName(projectId, options.SubscriptionId);

CreateTopicAndSubscription() tenta criar um tópico e uma assinatura no Cloud Pub/Sub, mas primeiro verifica se já existem.

public void CreateTopicAndSubscription()
{
    try
    {
        _pub.CreateTopic(_topicName);
        _logger.LogVerbose("Created topic " + _topicName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The topic already exists.  Ok.
        _logger.LogError(_topicName + " already exists", e);
    }
    try
    {
        _sub.CreateSubscription(_subscriptionName, _topicName, null, 0);
        _logger.LogVerbose("Created subscription " + _subscriptionName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The subscription already exists.  Ok.
        _logger.LogError(_subscriptionName + " already exists", e);
    }
}

Tarefas de fila

QueueMessage é codificada para JSON, e o JSON resultante é codificado para base64. Embora isso seja excessivo para a codificação de um long simples, essa é a maneira preferida de codificar mensagens para que elas sejam compatíveis com a API do Cloud Pub/Sub.

public void EnqueueBook(long bookId)
{
    var message = new QueueMessage() { BookId = bookId };
    var json = JsonConvert.SerializeObject(message);
    _pub.Publish(_topicName, new[] { new PubsubMessage()
    {
        Data = Google.Protobuf.ByteString.CopyFromUtf8(json)
    } });
}

O worker

O worker é um aplicativo separado que detecta eventos do Cloud Pub/Sub. Isso divide o aplicativo em dois processos independentes que se comunicam usando o Cloud Pub/Sub, em vez de diretamente um com o outro.

Processar livros

Para processar um livro, a tarefa faz a busca dele pelo código, localiza mais informações e salva as informações atualizadas de volta no banco de dados.

public void ProcessBook(IBookStore bookStore, long bookId)
{
    var book = bookStore.Read(bookId);
    _logger.LogVerbose($"Found {book.Title}.  Updating.");
    var query = "https://www.googleapis.com/books/v1/volumes?q="
        + Uri.EscapeDataString(book.Title);
    var response = WebRequest.Create(query).GetResponse();
    var reader = new StreamReader(response.GetResponseStream());
    var json = reader.ReadToEnd();
    UpdateBookFromJson(json, book);
    bookStore.Update(book);
}

A função PullOnce lê mensagens da assinatura e chama ProcessBook para cada mensagem.

        private void PullOnce(Action<long> callback, CancellationToken cancellationToken)
        {
            _logger.LogVerbose($"Pulling messages from {_subscriptionName}...");
            // Pull some messages from the subscription.

            var response = _sub.Pull(_subscriptionName, false, 3,
                CallSettings.FromCallTiming(
                    CallTiming.FromExpiration(
                        Expiration.FromTimeout(
                            TimeSpan.FromSeconds(90)))));
            if (response.ReceivedMessages == null)
            {
                // HTTP Request expired because the queue was empty.  Ok.
                _logger.LogVerbose("Pulled no messages.");
                return;
            }
            _logger.LogVerbose($"Pulled {response.ReceivedMessages.Count} messages.");
            if (response.ReceivedMessages.Count == 0)
            {
                return;
            }
            foreach (var message in response.ReceivedMessages)
            {
                try
                {
                    // Unpack the message.
                    byte[] json = message.Message.Data.ToByteArray();
                    var qmessage = JsonConvert.DeserializeObject<QueueMessage>(
                        Encoding.UTF8.GetString(json));
                    // Invoke ProcessBook().
                    callback(qmessage.BookId);
                }
                catch (Exception e)
                {
                    _logger.LogError("Error processing book.", e);
                }
            }
            // Acknowledge the message so we don't see it again.
            var ackIds = new string[response.ReceivedMessages.Count];
            for (int i = 0; i < response.ReceivedMessages.Count; ++i)
                ackIds[i] = response.ReceivedMessages[i].AckId;
            _sub.Acknowledge(_subscriptionName, ackIds);
        }
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…