Usar o Cloud Pub/Sub com o Node.js

Para muitos aplicativos, é necessário fazer o processamento em segundo plano fora do contexto de uma solicitação da Web. Neste exemplo, as tarefas são enviadas pelo app Bookshelf para serem executadas por um serviço de worker separado. O worker coleta as informações da API Google Books e atualiza as informações do livro no banco de dados. Esta amostra demonstra como configurar serviços separados no Google App Engine, como usar o Cloud Pub/Sub para distribuir mensagens e como lidar com eventos de ciclo de vida.

Esta página é parte de um tutorial com várias páginas. Para começar do início e ler as instruções de configuração, acesse o app Node.js Bookshelf.

Como definir configurações

Primeiro, crie um novo tópico no Cloud Pub/Sub da seguinte forma:

  1. Acesse a página Tópicos do Pub/Sub do Console do Cloud.
  2. Clique em Criar tópico.
  3. Insira um nome para seu tópico do Pub/Sub, como sample-topic.

Em seguida, faça uma nova assinatura no tópico que você acabou de criar da seguinte forma:

  1. Na página Tópicos do Pub/Sub do Console do Cloud, clique no nome do tópico que você acabou de criar.
  2. Clique em Criar assinatura.
  3. Insira um nome para sua assinatura do Pub/Sub, como sample-subscription.
  4. Escolha Enviar para um URL de ponto de extremidade para o tipo de entrega. Use

    https://worker-dot-[YOUR-PROJECT-ID].appspot.com/endpoint
    

    para o URL de ponto de extremidade. Substitua [YOUR-PROJECT-ID] pelo seu próprio valor.

Por último, copie o arquivo config.json da seção Autenticar usuários deste tutorial para o diretório nodejs-getting-started/6-pubsub. Adicione estas linhas ao arquivo copiado:

"TOPIC_NAME": "[YOUR_TOPIC_NAME]",

Substitua [YOUR_TOPIC_NAME] pelo nome do tópico que você acabou de criar.

Como instalar dependências

Use npm para instalar as dependências no diretório nodejs-getting-started/6-pubsub:

    npm install

Execução do app na máquina local

  1. Inicie um servidor da Web local usando npm:

        npm start
    
  1. Defina a variável de ambiente PORT e inicie o worker usando npm:

        SCRIPT=worker.js PORT=8081 npm start
    
  1. No navegador da Web, digite este endereço:

    http://localhost:8080

Acesse a instância do worker em http://localhost:8081. A página da Web desse worker exibe o status sobre o número de livros processados. Observe que o serviço do worker só inicia o processamento de mensagens após a implantação.

Como implantar o app no ambiente padrão do App Engine

  1. Implante o worker a partir do diretório nodejs-getting-started/6-pubsub:

    gcloud app deploy worker.yaml
    
  2. Implante o aplicativo de amostra a partir do diretório nodejs-getting-started/6-pubsub:

    gcloud app deploy
    
  3. No navegador da Web, digite este endereço. Substitua [YOUR_PROJECT_ID] pelo seu código do projeto:

    https://[YOUR_PROJECT_ID].appspot.com
    

Para atualizar o app, implante a versão atualizada digitando o mesmo comando usado para implantá-lo pela primeira vez. A implantação cria uma nova versão do aplicativo e a define como padrão. As versões anteriores do seu app são mantidas. Por padrão, o ambiente do App Engine é escalonado para zero instâncias quando não há tráfego de entrada em uma versão. Portanto, as versões não usadas não serão cobradas. No entanto, todas essas versões de apps ainda são recursos faturáveis.

Consulte a seção Limpeza na etapa final deste tutorial para ver mais informações sobre a limpeza de recursos faturáveis, incluindo versões de apps não padrão.

Estrutura do aplicativo

Veja no diagrama a seguir os componentes do aplicativo e como eles se conectam entre si.

Estrutura de amostra do Cloud Pub/Sub

Os eventos no Cloud Pub/Sub são publicados pelo aplicativo sempre que um livro é atualizado no banco de dados. O trabalhador, em execução separada, detecta esses eventos. Quando o evento é recebido, as informações do livro são solicitadas à API Google Books e o registro do livro no banco de dados é atualizado. Depois dessa atualização, é possível atualizar a página de informações do livro e ver as novas informações.

Como entender o código

Nesta seção, vamos analisar o código do aplicativo e explicar como ele funciona.

Como publicar eventos no Cloud Pub/Sub

Os eventos são publicados em tópicos no Cloud Pub/Sub.

function getTopic (cb) {
  pubsub.createTopic(topicName, (err, topic) => {
    // topic already exists.
    if (err && err.code === 6) {
      cb(null, pubsub.topic(topicName));
      return;
    }
    cb(err, topic);
  });
}

Um evento é enviado ao tópico pelo aplicativo contendo o código do livro que foi atualizado. Com isso, o trabalhador pode saber qual livro deve ser processado.

function queueBook (bookId) {
  getTopic((err, topic) => {
    if (err) {
      logging.error('Error occurred while getting pubsub topic', err);
      return;
    }

    const data = {
      action: 'processBook',
      bookId: bookId
    };

    const publisher = topic.publisher();
    publisher.publish(Buffer.from(JSON.stringify(data)), (err) => {
      if (err) {
        logging.error('Error occurred while queuing background task', err);
      } else {
        logging.info(`Book ${bookId} queued for background processing`);
      }
    });
  });
}

A função queueBook é chamada a partir do modelo sempre que um livro é criado ou atualizado. A implementação depende do back-end de banco de dados escolhido.

Datastore

function update (id, data, queueBook, cb) {
  let key;
  if (id) {
    key = ds.key([kind, parseInt(id, 10)]);
  } else {
    key = ds.key(kind);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      if (err) {
        cb(err);
        return;
      }
      data.id = entity.key.id;
      if (queueBook) {
        background.queueBook(data.id);
      }
      cb(null, data);
    }
  );
}

Cloud SQL

function create (data, queueBook, cb) {
  connection.query('INSERT INTO `books` SET ?', data, (err, res) => {
    if (err) {
      cb(err);
      return;
    }
    if (queueBook) {
      background.queueBook(res.insertId);
    }
    read(res.insertId, cb);
  });
}
function update (id, data, queueBook, cb) {
  connection.query(
    'UPDATE `books` SET ? WHERE `id` = ?', [data, id], (err) => {
      if (err) {
        cb(err);
        return;
      }
      if (queueBook) {
        background.queueBook(id);
      }
      read(id, cb);
    });
}

O aplicativo worker

O worker é um aplicativo separado que detecta eventos Pub/Sub em vez de veicular um aplicativo da Web voltado para o usuário. Com isso, o aplicativo é dividido em dois serviços independentes que se comunicam por meio do Pub/Sub, e não diretamente um com o outro. Assim, o número de instâncias do front-end e do worker pode ser configurado e escalonado separadamente.

Como detectar eventos

O serviço do worker aceita mensagens do Cloud Pub/Sub em /endpoint:

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

O App Engine distribui as solicitações (mensagens Pub/Sub) entre todas as instâncias disponíveis no serviço do worker com base na carga de trabalho atual deles e escalona o serviço por padrão se há um aumento no número de mensagens.

Como processar livros

Para processar o livro, ele é buscado pelo código pelo worker. As informações adicionais são localizadas e as atualizadas são salvas de volta no banco de dados:

function processBook (bookId, callback) {
  waterfall([
    // Load the current data
    (cb) => {
      model.read(bookId, cb);
    },
    // Find the information from Google
    findBookInfo,
    // Save the updated data
    (updated, cb) => {
      model.update(updated.id, updated, false, cb);
    }
  ], (err) => {
    if (err) {
      logging.error('Error occurred', err);
      if (callback) {
        callback(err);
      }
      return;
    }
    logging.info(`Updated book ${bookId}`);
    bookCount += 1;
    if (callback) {
      callback();
    }
  });
}

As novas informações sobre o livro são processadas pela função findBookInfo.

function findBookInfo (book, cb) {
  queryBooksApi(book.title, (err, r) => {
    if (!err && !r.items) {
      err = 'Not found';
    }
    if (err) {
      cb(err);
      return;
    }
    const top = r.items[0];

    book.title = top.volumeInfo.title;
    book.author = (top.volumeInfo.authors || []).join(', ');
    book.publishedDate = top.volumeInfo.publishedDate;
    book.description = book.description || top.volumeInfo.description;

    // If there is already an image for the book or if there's no
    // thumbnails, go ahead and return.
    if (book.imageUrl || !top.volumeInfo.imageLinks) {
      cb(null, book);
      return;
    }

    // Otherwise, try to fetch them and upload to cloud storage.
    const imageUrl =
      top.volumeInfo.imageLinks.thumbnail ||
      top.volumeInfo.imageLinks.smallThumbnail;
    const imageName = `${book.id}.jpg`;

    images.downloadAndUploadImage(
      imageUrl, imageName, (err, publicUrl) => {
        if (!err) {
          book.imageUrl = publicUrl;
        }
        cb(null, book);
      });
  });
}

A queryBooksApi é chamada pela função findBookInfo para buscar mais informações no Google Books.

function queryBooksApi (query, cb) {
  request(
    `https://www.googleapis.com/books/v1/volumes?country=US&q=${encodeURIComponent(query)}`,
    (err, resp, body) => {
      if (err || resp.statusCode !== 200) {
        console.log(`Error: ${err}`);
        console.log(`Response from Google Books: ${resp}`);
        console.log(`Response body from Google Books: ${body}`);
        cb(err || `Response returned ${resp.statusCode}`);
        return;
      }
      cb(null, JSON.parse(body));
    }
  );
}

Com a função findBookInfo, também é possível fazer o download de qualquer foto de capa disponível e depois fazer seu upload para o Cloud Storage:

function downloadAndUploadImage (sourceUrl, destFileName, cb) {
  const file = bucket.file(destFileName);

  request
    .get(sourceUrl)
    .on('error', (err) => {
      logging.warn(`Could not fetch image ${sourceUrl}`, err);
      cb(err);
    })
    .pipe(file.createWriteStream({
      resumable: false
    }))
    .on('finish', () => {
      logging.info(`Uploaded image ${destFileName}`);
      file.makePublic(() => {
        cb(null, getPublicUrl(destFileName));
      });
    })
    .on('error', (err) => {
      logging.error('Could not upload image', err);
      cb(err);
    });
}

Como executar no Google Cloud Platform

Mesmo que não seja fornecida nenhuma solicitação da Web pelo worker a nenhum usuário, recomendamos fazer verificações de integridade para todos os aplicativos executados no Google App Engine. É enviada uma solicitação HTTP GET periódica para /_ah/health para garantir a integridade da instância do aplicativo. Por padrão, qualquer resposta de status diferente de HTTP 500 indica uma instância íntegra, incluindo HTTP 404. Você não precisa definir explicitamente uma verificação de integridade no aplicativo de front-end porque retornou um código de status 404 para /_ah/health.

Para lidar com verificações de integridade do trabalhador de back-end, crie um servidor da Web simples:

const app = express();

const jsonParser = bodyParser.json();

app.use(logging.requestLogger);

app.get('/_ah/health', (req, res) => {
  res.status(200).send('ok');
});

// Keep count of how many books this worker has processed
let bookCount = 0;

app.get('/', (req, res) => {
  res.status(200).send(`This worker has processed ${bookCount} books.`);
});

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

app.use(logging.errorLogger);

if (module === require.main) {
  const server = app.listen(config.get('PORT'), () => {
    const port = server.address().port;
    console.log(`App listening on port ${port}`);
  });
}

Agora, são detectados tanto solicitações de verificação de integridade quanto eventos do Cloud Pub/Sub.

É necessário também uma configuração de módulo própria para o trabalhador.

service: worker
runtime: nodejs8

env_variables:
  SCRIPT: worker.js

Essa configuração é muito similar ao arquivo app.yaml usado para o front-end, sendo que a principal diferença é a cláusula service: worker. Pode haver vários serviços independentes para os aplicativos do App Engine. Isso significa que é possível implantar, configurar, dimensionar e atualizar partes do seu aplicativo com facilidade e de maneira independente.

Como fazer a limpeza

Para evitar que os recursos usados nesse tutorial sejam cobrados na conta do Google Cloud Platform, é possível fazer o seguinte:

Excluir o projeto

O jeito mais fácil de evitar cobranças é excluindo o projeto que você criou para o tutorial.

Para excluir o projeto:

  1. No Console do GCP, acesse a página "Projetos".

    Acessar a página Projetos

  2. Na lista de projetos, selecione um e clique em Excluir projeto.
  3. Na caixa de diálogo, digite o código do projeto e clique em Encerrar para excluí-lo.

Excluir versões não padrão do app

Se você não quer excluir seu projeto, pode reduzir custos excluindo versões não padrão do app.

Para excluir uma versão do aplicativo:

  1. No Console do GCP, acesse a página "Versões do App Engine".

    Acessar a página Versões

  2. Clique na caixa de seleção ao lado da versão do aplicativo não padrão que você quer excluir.
  3. Clique no botão Excluir na parte superior da página para excluir a versão do aplicativo.

Excluir a instância do Cloud SQL

Para excluir uma instância do Cloud SQL:

  1. No Console do GCP, acesse a página Instâncias de SQL.

    Acessar a página "Instâncias de SQL"

  2. Clique no nome a instância de SQL que você quer excluir.
  3. Para excluir a instância, clique em Excluir excluir no topo da página.

Excluir o intervalo do Google Cloud Storage

Para excluir um intervalo do Google Cloud Storage:

  1. No Console do GCP, acesse o navegador do Cloud Storage.

    Acessar o navegador do Cloud Storage

  2. Marque a caixa de seleção ao lado do intervalo que você quer excluir.
  3. Para excluir o intervalo, clique no botão Excluir no topo da página.

Próximas etapas

Aprenda a executar o app de amostra Bookshelf em Node.js no Compute Engine.

Conheça outros recursos do Google Cloud Platform. Veja nossos tutoriais.

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…