Uso de Cloud Pub/Sub con Node.js

Muchas aplicaciones necesitan realizar procesamientos en segundo plano fuera del contexto de una solicitud web. En este caso, la aplicación de muestra de Bookshelf le envía tareas a un servicio de trabajador separado para que las ejecute. El trabajador recibe información desde la API de Google Libros y actualiza la información de los libros en la base de datos. En esta muestra se indica cómo configurar servicios independientes en Google App Engine, cómo usar Cloud Pub/Sub para distribuir mensajes y cómo solucionar eventos del ciclo de vida.

Esta página forma parte de un instructivo de varias páginas. Ve a la aplicación Bookshelf en Node.js para comenzar desde el principio y leer las instrucciones de configuración.

Configuraciones

Primero, crea un tema nuevo en Cloud Pub/Sub:

  1. Ve a la página de Temas de Pub/Sub de Cloud Console.
  2. Haz clic en Crear tema.
  3. Escribe un nombre para tu tema de Pub/Sub, como sample-topic.

Luego, crea una suscripción nueva al tema que acabas de crear:

  1. Haz clic en el nombre que acabas de crear en la página de Temas de Pub/Sub de Cloud Console.
  2. Haz clic en Crear suscripción.
  3. Escribe un nombre para tu suscripción a Pub/Sub, como sample-subscription.
  4. Elige Enviar a una URL de extremo como tipo de envío. Usa

    https://worker-dot-[YOUR-PROJECT-ID].appspot.com/endpoint
    

    para la URL de extremo. Reemplaza [YOUR-PROJECT-ID] con tu valor.

Finalmente, copia tu archivo config.json desde la sección Autenticar usuarios de este instructivo hasta el directorio nodejs-getting-started/6-pubsub. Agrega las líneas siguientes al archivo copiado:

"TOPIC_NAME": "[YOUR_TOPIC_NAME]",

Reemplaza [YOUR_TOPIC_NAME] con el nombre del tema que acabas de crear.

Instalar dependencias

Usa npm para instalar las dependencias en el directorio nodejs-getting-started/6-pubsub:

    npm install

Ejecutar la aplicación en la máquina local

  1. Usa npm para iniciar un servidor web local:

        npm start
    
  1. Configura la variable de entorno PORT y usa npm para iniciar el trabajador:

        SCRIPT=worker.js PORT=8081 npm start
    
  1. En el navegador web, ingresa la siguiente dirección:

    http://localhost:8080

Puedes alcanzar la instancia del worker en http://localhost:8081. La página web del worker muestra en su estado la cantidad de libros que ha procesado. Ten en cuenta que el servicio de trabajador solo comienza a procesar los mensajes después de la implementación.

Implementar la aplicación en el entorno estándar de App Engine

  1. Implementa el trabajador desde el directorio nodejs-getting-started/6-pubsub:

    gcloud app deploy worker.yaml
    
  2. Implementa la aplicación de muestra desde el directorio nodejs-getting-started/6-pubsub:

    gcloud app deploy
    
  3. En el navegador web, ingresa la siguiente dirección. Reemplaza [YOUR_PROJECT_ID] por el ID del proyecto:

    https://[YOUR_PROJECT_ID].appspot.com
    

Si actualizas tu app, podrás implementar la versión actualizada mediante el mismo comando que usaste para implementar la app por primera vez. La implementación nueva crea una versión nueva de la app y la convierte a la versión predeterminada. Se conservan las versiones anteriores de la aplicación. De forma predeterminada, el entorno estándar de App Engine escala a 0 instancias cuando no hay tráfico entrante a una versión, por lo que las versiones sin usar no deben tener costo. Sin embargo, todas estas versiones de la aplicación son recursos facturables.

Consulta la sección Limpiar en el paso final de este instructivo para obtener más información sobre la limpieza de recursos facturables, incluidas las versiones de la aplicación no predeterminadas.

Estructura de la aplicación

El siguiente diagrama muestra los componentes de la aplicación y la manera en que se conectan entre sí.

Estructura de la muestra de Cloud Pub/Sub

La aplicación publica eventos en Cloud Pub/Sub cada vez que se actualiza un libro en la base de datos. El worker, que se ejecuta de manera independiente, escuchará estos eventos. Cuando se recibe el evento, el worker le solicita a la API de Google Libros la información del libro y actualiza el registro del libro en la base de datos. Una vez que se actualice el registro, deberías poder actualizar la página de información del libro para ver la información nueva.

Comprensión del código

En esta sección se explica el código de la aplicación y su funcionamiento.

Publicación de eventos en Cloud Pub/Sub

Los eventos se publican en temas en Cloud Pub/Sub.

function getTopic (cb) {
  pubsub.createTopic(topicName, (err, topic) => {
    // topic already exists.
    if (err && err.code === 6) {
      cb(null, pubsub.topic(topicName));
      return;
    }
    cb(err, topic);
  });
}

La aplicación enviará un evento al tema que contiene el ID del libro que se actualizó. Esto le informará al worker qué libro debe procesar.

function queueBook (bookId) {
  getTopic((err, topic) => {
    if (err) {
      logging.error('Error occurred while getting pubsub topic', err);
      return;
    }

    const data = {
      action: 'processBook',
      bookId: bookId
    };

    const publisher = topic.publisher();
    publisher.publish(Buffer.from(JSON.stringify(data)), (err) => {
      if (err) {
        logging.error('Error occurred while queuing background task', err);
      } else {
        logging.info(`Book ${bookId} queued for background processing`);
      }
    });
  });
}

Cada vez que un libro se crea o actualiza, se llama a la función queueBook desde el modelo. La implementación depende del backend de base de datos que elijas:

Datastore

function update (id, data, queueBook, cb) {
  let key;
  if (id) {
    key = ds.key([kind, parseInt(id, 10)]);
  } else {
    key = ds.key(kind);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      if (err) {
        cb(err);
        return;
      }
      data.id = entity.key.id;
      if (queueBook) {
        background.queueBook(data.id);
      }
      cb(null, data);
    }
  );
}

Cloud SQL

function create (data, queueBook, cb) {
  connection.query('INSERT INTO `books` SET ?', data, (err, res) => {
    if (err) {
      cb(err);
      return;
    }
    if (queueBook) {
      background.queueBook(res.insertId);
    }
    read(res.insertId, cb);
  });
}
function update (id, data, queueBook, cb) {
  connection.query(
    'UPDATE `books` SET ? WHERE `id` = ?', [data, id], (err) => {
      if (err) {
        cb(err);
        return;
      }
      if (queueBook) {
        background.queueBook(id);
      }
      read(id, cb);
    });
}

La aplicación worker

La aplicación worker es independiente y escucha a los eventos de Pub/Sub en vez de atender a una aplicación web orientada al usuario. Esto divide a la aplicación en dos servicios independientes que se comunican a través de Pub/Sub en lugar de directamente entre sí. Separar los servicios te permite configurar y escalar la cantidad de instancias de trabajador y de frontend de manera independiente.

Escuchar eventos

El servicio de trabajador acepta mensajes de Cloud Pub/Sub en /endpoint:

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

App Engine distribuye solicitudes (mensajes de Pub/Sub) entre todas las instancias disponibles en el servicio de trabajador con base en su carga de trabajo actual y escala el servicio de forma predeterminada si se produce un aumento en la cantidad de mensajes.

Procesar libros

Para procesar el libro, el worker encuentra el libro según su ID, busca información adicional y luego guarda la información actualizada en la base de datos:

function processBook (bookId, callback) {
  waterfall([
    // Load the current data
    (cb) => {
      model.read(bookId, cb);
    },
    // Find the information from Google
    findBookInfo,
    // Save the updated data
    (updated, cb) => {
      model.update(updated.id, updated, false, cb);
    }
  ], (err) => {
    if (err) {
      logging.error('Error occurred', err);
      if (callback) {
        callback(err);
      }
      return;
    }
    logging.info(`Updated book ${bookId}`);
    bookCount += 1;
    if (callback) {
      callback();
    }
  });
}

La función findBookInfo controla el procesamiento de la nueva información del libro:

function findBookInfo (book, cb) {
  queryBooksApi(book.title, (err, r) => {
    if (!err && !r.items) {
      err = 'Not found';
    }
    if (err) {
      cb(err);
      return;
    }
    const top = r.items[0];

    book.title = top.volumeInfo.title;
    book.author = (top.volumeInfo.authors || []).join(', ');
    book.publishedDate = top.volumeInfo.publishedDate;
    book.description = book.description || top.volumeInfo.description;

    // If there is already an image for the book or if there's no
    // thumbnails, go ahead and return.
    if (book.imageUrl || !top.volumeInfo.imageLinks) {
      cb(null, book);
      return;
    }

    // Otherwise, try to fetch them and upload to cloud storage.
    const imageUrl =
      top.volumeInfo.imageLinks.thumbnail ||
      top.volumeInfo.imageLinks.smallThumbnail;
    const imageName = `${book.id}.jpg`;

    images.downloadAndUploadImage(
      imageUrl, imageName, (err, publicUrl) => {
        if (!err) {
          book.imageUrl = publicUrl;
        }
        cb(null, book);
      });
  });
}

La función findBookInfo llama a queryBooksApi para que obtenga más información desde Google Libros:

function queryBooksApi (query, cb) {
  request(
    `https://www.googleapis.com/books/v1/volumes?country=US&q=${encodeURIComponent(query)}`,
    (err, resp, body) => {
      if (err || resp.statusCode !== 200) {
        console.log(`Error: ${err}`);
        console.log(`Response from Google Books: ${resp}`);
        console.log(`Response body from Google Books: ${body}`);
        cb(err || `Response returned ${resp.statusCode}`);
        return;
      }
      cb(null, JSON.parse(body));
    }
  );
}

La función findBookInfo también intenta descargar cualquier foto de portada disponible y luego la sube a Cloud Storage:

function downloadAndUploadImage (sourceUrl, destFileName, cb) {
  const file = bucket.file(destFileName);

  request
    .get(sourceUrl)
    .on('error', (err) => {
      logging.warn(`Could not fetch image ${sourceUrl}`, err);
      cb(err);
    })
    .pipe(file.createWriteStream({
      resumable: false
    }))
    .on('finish', () => {
      logging.info(`Uploaded image ${destFileName}`);
      file.makePublic(() => {
        cb(null, getPublicUrl(destFileName));
      });
    })
    .on('error', (err) => {
      logging.error('Could not upload image', err);
      cb(err);
    });
}

Ejecución en Google Cloud Platform

Si bien el worker no atiende ninguna solicitud web de usuarios, se recomienda proporcionar verificaciones de estado para todas las aplicaciones que se ejecutan en Google App Engine. Las verificaciones de estado envían una solicitud HTTP GET periódica a /_ah/health para asegurarse de que la instancia de tu aplicación funcione correctamente. Según la configuración predeterminada, cualquier respuesta de estado diferente de HTTP 500, incluida HTTP 404, indica una instancia en buen estado. No fue necesario que definieras explícitamente una verificación de estado en la aplicación de frontend porque respondió a /_ah/health con un código de estado 404.

Para controlar las verificaciones de estado del worker de backend, puedes crear un servidor web simple:

const app = express();

const jsonParser = bodyParser.json();

app.use(logging.requestLogger);

app.get('/_ah/health', (req, res) => {
  res.status(200).send('ok');
});

// Keep count of how many books this worker has processed
let bookCount = 0;

app.get('/', (req, res) => {
  res.status(200).send(`This worker has processed ${bookCount} books.`);
});

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

app.use(logging.errorLogger);

if (module === require.main) {
  const server = app.listen(config.get('PORT'), () => {
    const port = server.address().port;
    console.log(`App listening on port ${port}`);
  });
}

Ahora, el worker escuchará tanto solicitudes de verificación de estado como eventos de Cloud Pub/Sub.

Además, el worker necesita su propia configuración de módulos.

service: worker
runtime: nodejs8

env_variables:
  SCRIPT: worker.js

Esta configuración es muy similar al archivo app.yaml que se usa para el frontend, pero la diferencia clave es la cláusula service: worker. Las aplicaciones en App Engine pueden tener varios servicios independientes. Significa que puedes implementar, configurar, escalar y actualizar fácilmente partes de tu aplicación de manera independiente.

Limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform por los recursos que usaste en este instructivo:

Borra el proyecto

La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.

Para borrar el proyecto, haz lo siguiente:

  1. En la GCP Console, dirígete a la página Proyectos.

    Ir a la página Proyectos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
  3. En el cuadro de diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Borra las versiones no predeterminadas de tu app

Si no quieres borrar tu proyecto, puedes borrar las versiones no predeterminadas de tu app para reducir los costos.

Para borrar una versión de una app, haz lo siguiente:

  1. En GCP Console, dirígete a la página Versiones de App Engine.

    Ir a la página de Versiones

  2. Haz clic en la casilla de verificación junto a la versión de app no predeterminada que deseas borrar.
  3. Haz clic en el botón Borrar en la parte superior de la página para borrar la versión de la app.

Borra tu instancia de Cloud SQL

Para borrar una instancia de Cloud SQL, haz lo siguiente:

  1. En GCP Console, ve a la página SQL Instances.

    Ir a la página SQL Instances.

  2. Selecciona el nombre de la instancia de SQL que quieres borrar.
  3. Haz clic en el botón Borrar en la parte superior de la página para borrar la instancia.

Borra tu depósito de Cloud Storage

Para borrar un depósito de Cloud Storage, haz lo siguiente:

  1. En la GCP Console, dirígete al navegador de Cloud Storage.

    Ir al navegador de Cloud Storage

  2. Haz clic en la casilla de verificación junto al depósito que deseas borrar.
  3. Haz clic en el botón Borrar en la parte superior del depósito.

¿Qué sigue?

Aprende a ejecutar la app de muestra Bookshelf en Node.js en Compute Engine.

Prueba otras características de Google Cloud Platform tú mismo. Revisa nuestros instructivos.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…