Usar Cloud Pub/Sub con Python

Muchas aplicaciones necesitan realizar el procesamiento en segundo plano fuera del contexto de una petición web. En esta muestra, la aplicación Bookshelf envía tareas a un trabajador de segundo plano independiente para su ejecución. El trabajador recopila información de la API de Google Books y actualiza la información del libro en la base de datos. En esta muestra, se explica cómo configurar servicios separados en Google App Engine, cómo ejecutar el proceso de un trabajador en el entorno flexible de App Engine y cómo administrar los eventos del ciclo de vida.

Esta página forma parte de un tutorial de varias páginas. Para comenzar desde lo básico y consultar las instrucciones de configuración, ve a la aplicación Bookshelf de Python.

Establecer configuración

En esta sección, se utilizan códigos en el directorio 6-pubsub. Edita los archivos y ejecuta los siguientes comandos en este directorio.

  1. Abre config.py para editarlo.
  2. Define el valor de PROJECT_ID en el ID del proyecto, que puede verse en la consola de GCP.
  3. Usa el mismo valor que utilizaste en el tutorial sobre el uso de datos estructurados para establecer el valor de DATA_BACKEND.
  4. Si utilizas Cloud SQL o MongoDB, usa los mismos valores que empleaste en el paso sobre el uso de datos estructurados para establecer los valores en la sección de Cloud SQL o Mongo.
  5. Utiliza el nombre de segmento de Cloud Storage para establecer el valor CLOUD_STORAGE_BUCKET.

  6. En la sección de OAuth2 configuration, utiliza el ID y secreto de cliente que utilizaste anteriormente para establecer los valores de GOOGLE_OAUTH2_CLIENT_ID y GOOGLE_OAUTH2_CLIENT_SECRET.

  7. Guarda y cierra config.py.

Si usas Cloud SQL:

  1. Abre app.yaml para editarlo.
  2. Usa el mismo valor que usaste para CLOUDSQL_CONNECTION_NAME en config.py para establecer el valor de cloud_sql_instances. Debe tener el formato project:region:cloudsql-instance. Elimina los comentarios de toda esta línea.
  3. Guarda y cierra app.yaml.

Instalar dependencias

Introduce los comandos a continuación para crear un entorno virtual e instalar dependencias:

Linux/Mac OS X

virtualenv -p python3 env
source env/bin/activate
pip install -r requirements.txt

Windows

virtualenv -p python3 env
env\scripts\activate
pip install -r requirements.txt

Ejecutar la aplicación en la máquina local:

  1. Ejecuta el siguiente trabajador:

    psqworker main.books_queue
    
  2. Inicia un servidor web local:

    python main.py
    
  3. Introduce la siguiente dirección en el navegador web:

    http://localhost:8080

Ahora agrega algunos libros conocidos a la estantería. Si tienes la aplicación y la instancia de trabajador en ejecución localmente, puedes ver cómo el trabajador actualiza la información del libro en segundo plano.

Es importante que ejecutes el trabajador al menos una vez antes de agregar libros. El trabajador establecerá una suscripción de Cloud Pub/Sub para escuchar eventos. Sin la suscripción, los eventos publicados en el tema simplemente se perderán y no verás ningún cambio en los datos de la estantería. Una vez que la suscripción existe, los eventos se pondrán en cola, incluso si no hay ningún trabajador escuchando eventos en ese momento. Cuando un trabajador se conecta, Cloud Pub/Sub entregará cualquier evento en cola.

Presiona Control + C para salir del servidor web local.

Desplegar la aplicación en el entorno flexible de App Engine

  1. Despliega el siguiente trabajador:

    gcloud app deploy worker.yaml
    
  2. Despliega la aplicación de muestra:

    gcloud app deploy
    
  3. Introduce la siguiente dirección en el navegador web. Sustituye [YOUR_PROJECT_ID] por el ID del proyecto:

    https://[YOUR_PROJECT_ID].appspot.com
    

Si actualizas la aplicación, puedes desplegar la versión actualizada al introducir el mismo comando que utilizaste para desplegar la aplicación por primera vez. El nuevo despliegue crea una nueva versión de la aplicación y la promociona a la versión predeterminada. Las versiones anteriores de la aplicación se mantienen, al igual que las instancias de VM asociadas. Ten en cuenta que todas estas versiones de aplicaciones e instancias de VM son recursos facturables.

Si eliminas las versiones no predeterminadas de la aplicación, puedes reducir los costes.

Para eliminar una versión de la aplicación, sigue las instrucciones a continuación:

  1. En GCP Console, dirígete a la página Versiones de App Engine.

    Ir a la página de Versiones

  2. Haz clic en la casilla de verificación junto a la versión de app no predeterminada que deseas borrar.
  3. Haz clic en el botón Borrar en la parte superior de la página para borrar la versión de la app.

Si quieres obtener información completa sobre cómo limpiar los recursos facturables, consulta la sección sobre limpiar los recursos en el último paso de este tutorial.

Estructura de la aplicación

En el siguiente diagrama, se muestran los componentes de la aplicación y cómo se conectan entre ellos.

Estructura de muestra de Cloud Pub/Sub

La aplicación publica eventos en Cloud Pub/Sub cada vez que se actualiza un libro en la base de datos. Esta publicación se realiza colocando las tareas en una cola en concreto. El trabajador, que se ejecuta por separado, escucha esos eventos. Cuando se recibe un evento, el trabajador realiza una petición a la API de Google Books para obtener información sobre el libro y actualiza el registro del libro en la base de datos. Tras haber actualizado el registro, puedes actualizar la página de información del libro y ver la información nueva.

Información sobre el código

En esta sección te mostramos cómo crear una cola, agregar tareas a la cola y usar el trabajador para procesar tareas. Por último, demostramos cómo ejecutar el proceso del trabajador en el entorno flexible de App Engine.

En este ejemplo se utiliza psq, un paquete de Python que implementa una cola simple de tareas distribuidas con Google Cloud Pub/Sub. psq es intencionalmente similar al paquete de Pythonrq.

Crear una cola

Las tareas se agregan a una cola en concreto. El trabajador observa qué tareas ejecutar en esta cola. Para crear una cola con psq, sigue estos pasos:

publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()

def get_books_queue():
    project = current_app.config['PROJECT_ID']

    # Create a queue specifically for processing books and pass in the
    # Flask application context. This ensures that tasks will have access
    # to any extensions / configuration specified to the app, such as
    # models.
    return psq.Queue(
        publisher_client, subscriber_client, project,
        'books', extra_context=current_app.app_context)

Si utilizas celery o rq, la configuración es similar.

Colocar las tareas en cola

Una tarea es simplemente una función de Python que el trabajador ejecuta en lugar de la aplicación de interfaz. La aplicación Bookshelf coloca las tareas en cola para procesar un libro cada vez que se crea o actualiza un libro:

q = tasks.get_books_queue()
q.enqueue(tasks.process_book, book['id'])

Otras colas de tareas, como rq y celery, tienen mecanismos de cola similares. Más adelante, en la sección de Procesamiento de libros, analizamos la implementación de process_book.

El trabajador

El trabajador es una aplicación independiente que escucha eventos de Pub/Sub. Esto divide la aplicación en dos servicios independientes que se comunican mediante el uso de Pub/Sub, en lugar de directamente entre ellas. La separación de los servicios te permite configurar y escalar el número de instancias de interfaz y trabajador por separado.

Ejecutar al trabajador

El paquete psq incluye el trabajador independiente psqworker. Este trabajador es un proceso de Python que se ejecuta en segundo plano para realizar tareas excesivamente largas o de bloqueo que no quieras realizar en el proceso principal de ejecución. Esto es similar a rqworker de rq.

psqworker main.books_queue

Si psqworker se inicia de esta manera, se crea automáticamente una suscripción a la cola a la que apunta main.books_queue. Cuando un evento se publica en Pub/Sub al ponerse en cola como una tarea, psqworker automáticamente lo retira de la cola y ejecuta la tarea.

Debido a que psqworker necesita poder importar la cola, la cola debe estar disponible como una importación a nivel de módulo.

# Make the queue available at the top-level, this allows you to run
# `psqworker main.books_queue`. We have to use the app's context because
# it contains all the configuration for plugins.
# If you were using another task queue, such as celery or rq, you can use this
# section to configure your queues to work with Flask.
with app.app_context():
    books_queue = bookshelf.tasks.get_books_queue()

Procesar libros

Para procesar un libro, la tarea utiliza el ID del libro para recuperarlo, busca más información y luego guarda la información actualizada en la base de datos:

def process_book(book_id):
    """
    Handles an individual Bookshelf message by looking it up in the
    model, querying the Google Books API, and updating the book in the model
    with the info found in the Books API.
    """

    model = get_model()

    book = model.read(book_id)

    if not book:
        logging.warn("Could not find book with id {}".format(book_id))
        return

    if 'title' not in book:
        logging.warn("Can't process book id {} without a title."
                     .format(book_id))
        return

    logging.info("Looking up book with title {}".format(book[
                                                        'title']))

    new_book_data = query_books_api(book['title'])

    if not new_book_data:
        return

    book['title'] = new_book_data.get('title')
    book['author'] = ', '.join(new_book_data.get('authors', []))
    book['publishedDate'] = new_book_data.get('publishedDate')
    book['description'] = new_book_data.get('description')

    # If the new book data has thumbnail images and there isn't currently a
    # thumbnail for the book, then copy the image to cloud storage and update
    # the book data.
    if not book.get('imageUrl') and 'imageLinks' in new_book_data:
        new_img_src = new_book_data['imageLinks']['smallThumbnail']
        book['imageUrl'] = download_and_upload_image(
            new_img_src,
            "{}.jpg".format(book['title']))

    model.update(book, book_id)

Ten en cuenta que la tarea solo necesita el ID del libro. Es mejor proporcionar el ID del libro en lugar de la instancia del libro, ya que los datos podrían estar obsoletos para cuando el trabajador procese la tarea.

Además, ten en cuenta que el trabajador puede llamar a bookshelf.get_model() a pesar de que no se ejecuta en una aplicación Flask. Cuando la cola está configurada en get_books_queue, el contexto de la aplicación queda disponible para todas las tareas.

La función query_books_api se encarga de llamar a la API de Google Books para obtener más información sobre el libro.

def query_books_api(title):
    """
    Queries the Google Books API to find detailed information about the book
    with the given title.
    """
    r = requests.get('https://www.googleapis.com/books/v1/volumes', params={
        'q': title
    })

    try:
        data = r.json()['items'][0]['volumeInfo']
        return data

    except KeyError:
        logging.info("No book found for title {}".format(title))
        return None

    except ValueError:
        logging.info("Unexpected response from books API: {}".format(r))
        return None

def download_and_upload_image(src, dst_filename):
    """
    Downloads an image file and then uploads it to Google Cloud Storage,
    essentially re-hosting the image in GCS. Returns the public URL of the
    image in GCS
    """
    r = requests.get(src)

    if not r.status_code == 200:
        return

    return storage.upload_file(
        r.content,
        dst_filename,
        r.headers.get('content-type', 'image/jpeg'))

Ejecutar en Google Cloud Platform

El trabajador se despliega como un servicio independiente en la misma aplicación. Las aplicaciones de App Engine pueden tener varios servicios independientes. Esto significa que puedes desplegar, configurar, escalar y actualizar de manera fácil e independiente las partes de su aplicación. La interfaz se despliega en el servicio predeterminado y el trabajador se despliega en el servicio del trabajador.

Aunque el trabajador no publica ninguna petición web a los usuarios, y ni siquiera ejecuta una aplicación web, es muy recomendable que proporciones una comprobación de estado HTTP cuando se ejecute en un entorno flexible para garantizar la ejecución y respuesta del servicio. Sin embargo, se puede inhabilitar la comprobación de estado.

Para proporcionar una comprobación de estado, el trabajador inicia dos procesos en lugar de uno. El primer proceso es psqworker y el otro es una aplicación simple de Flask que garantiza la ejecución y respuesta del proceso psqworker a las comprobaciones de estado.

La aplicación usa Honcho, un puerto de Python del administrador de procesos de Foreman, para administrar varios procesos. Los procesos se configuran con un procfile:

bookshelf: gunicorn -b 0.0.0.0:$PORT main:app
worker: psqworker --pid /tmp/psq.pid main.books_queue
monitor: python monitor.py /tmp/psq.pid

El entrypoint en app.yaml ahora usa honcho en lugar de ejecutar procesos directamente:

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the default service, only the
# frontend process is needed.
entrypoint: honcho start -f /app/procfile bookshelf

Ten en cuenta que procfile también contiene una entrada para la aplicación bookshelf de interfaz. Debido a que los servicios predeterminado (interfaz) y de trabajador comparten exactamente la misma base de código, los argumentos para honcho en entrypoint controlan qué procesos se inician. En el siguiente diagrama, se contrasta el despliegue del servicio único a la izquierda con el despliegue de varios servicios a la derecha:

Despliegue de Cloud Pub/Sub

Podrías crear un servicio que ejecute los procesos bookshelf y worker, pero perderías la capacidad de administrarlos y escalarlos de forma independiente.

El trabajador es un servicio separado, por lo que necesita su propio archivo de configuración de YAML individual y completo.

service: worker
runtime: python
env: flex

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the worker service, both the queue worker
# and the monitor process are needed.
entrypoint: honcho start -f /app/procfile worker monitor

runtime_config:
  python_version: 3

beta_settings:
    # If using Cloud SQL, uncomment and set this value to the Cloud SQL
    # connection name, e.g.
    #   "project:region:cloudsql-instance"
    # You must also update the values in config.py.
    #
    # cloud_sql_instances: "your-cloudsql-connection-name"

Esta configuración es similar al archivo app.yaml que se utiliza para la interfaz. Las diferencias principales son service: worker y los argumentos que se transfirieron a honcho en entrypoint.

La aplicación de supervisión

La aplicación de supervisión observa el proceso psqworker y proporciona comprobaciones de estado. Esta aplicación de supervisión puede ser reutilizarse fácilmente para ejecutar prácticamente cualquier servidor independiente en el entorno flexible, tal como un rqworker, un trabajador celery o incluso un redis secundario.

import os
import sys

from flask import Flask

# The app checks this file for the PID of the process to monitor.
PID_FILE = None

# Create app to handle health checks and monitor the queue worker. This will
# run alongside the worker, see procfile.
monitor_app = Flask(__name__)

# The health check reads the PID file created by psqworker and checks the proc
# filesystem to see if the worker is running. This same pattern can be used for
# rq and celery.
@monitor_app.route('/_ah/health')
def health():
    if not os.path.exists(PID_FILE):
        return 'Worker pid not found', 503

    with open(PID_FILE, 'r') as pidfile:
        pid = pidfile.read()

    if not os.path.exists('/proc/{}'.format(pid)):
        return 'Worker not running', 503

    return 'healthy', 200

@monitor_app.route('/')
def index():
    return health()

if __name__ == '__main__':
    PID_FILE = sys.argv[1]
    monitor_app.run('0.0.0.0', 8080)

Limpiar los recursos

Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud Platform, sigue estas instrucciones:

Eliminar el proyecto

La forma más fácil de eliminar la facturación es eliminar el proyecto que has creado para el tutorial.

Para eliminar el proyecto, sigue las instrucciones a continuación:

  1. En la GCP Console, dirígete a la página Proyectos.

    Ir a la página Proyectos

  2. En la lista de proyectos, selecciona el proyecto que deseas borrar y haz clic en Borrar.
  3. En el cuadro de diálogo, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrar el proyecto.

Eliminar versiones no predeterminadas de la aplicación

Si no quieres eliminar el proyecto, puedes reducir los costes mediante la eliminación de versiones no predeterminadas de la aplicación.

Para eliminar una versión de la aplicación, sigue las instrucciones a continuación:

  1. En GCP Console, dirígete a la página Versiones de App Engine.

    Ir a la página de Versiones

  2. Haz clic en la casilla de verificación junto a la versión de app no predeterminada que deseas borrar.
  3. Haz clic en el botón Borrar en la parte superior de la página para borrar la versión de la app.

Eliminar la instancia de Cloud SQL

Para eliminar una instancia de Cloud SQL, sigue las instrucciones a continuación:

  1. En GCP Console, ve a la página SQL Instances.

    Ir a la página SQL Instances.

  2. Selecciona el nombre de la instancia de SQL que quieres borrar.
  3. Haz clic en el botón Borrar en la parte superior de la página para borrar la instancia.

Eliminar el segmento de Cloud Storage

Para eliminar un segmento de Cloud Storage, sigue las instrucciones a continuación:

  1. En la GCP Console, dirígete al navegador de Cloud Storage.

    Ir al navegador de Cloud Storage

  2. Haz clic en la casilla de verificación junto al depósito que deseas borrar.
  3. Haz clic en el botón Borrar en la parte superior del depósito.

Siguientes pasos

Obtén información sobre cómo ejecutar la muestra de Bookshelf para Python en Compute Engine.

Prueba otras funciones de Google Cloud Platform por tu cuenta. Echa un vistazo a nuestros tutoriales.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...