Python으로 Cloud Pub/Sub 사용

많은 애플리케이션은 웹 요청의 컨텍스트 외부에서 백그라운드 처리를 해야 합니다. 이 샘플에서 Bookshelf 앱은 별도의 백그라운드 작업자에게 실행할 작업을 보냅니다. 작업자는 Google Books API에서 정보를 수집하고 데이터베이스에서 도서 정보를 업데이트합니다. 이 샘플은 Google App Engine에서 별도의 서비스를 설정하는 방법, App Engine 가변형 환경에서 작업자 프로세스를 실행하는 방법, 수명 주기 이벤트를 처리하는 방법을 보여줍니다.

이 페이지는 여러 페이지로 구성된 가이드의 일부입니다. 처음부터 시작하여 설정 안내를 보려면 Python Bookshelf 앱으로 이동하세요.

설정 구성

이 섹션에서는 6-pubsub 디렉토리의 코드를 사용합니다. 이 디렉토리에서 파일을 수정하고 명령어를 실행하세요.

  1. 수정하기 위해 config.py를 엽니다.
  2. PROJECT_ID 값을 GCP 콘솔에 표시되는 프로젝트 ID로 설정합니다.
  3. DATA_BACKEND구조화된 데이터 사용 가이드에서 사용한 값과 같은 값으로 설정합니다.
  4. Cloud SQL 또는 MongoDB를 사용하는 경우, Cloud SQL 또는 Mongo 섹션의 값을 구조화된 데이터 사용 단계에서 사용한 값과 같은 값으로 설정합니다.
  5. CLOUD_STORAGE_BUCKET 값을 Cloud Storage 버킷 이름으로 설정합니다.

  6. OAuth2 configuration 섹션에서 GOOGLE_OAUTH2_CLIENT_IDGOOGLE_OAUTH2_CLIENT_SECRET 값을 이전에 만든 애플리케이션 클라이언트 ID 및 비밀번호로 설정합니다.

  7. config.py를 저장하고 닫습니다.

Cloud SQL을 사용하는 경우 다음을 수행합니다.

  1. 수정하기 위해 app.yaml을 엽니다.
  2. cloud_sql_instances 값을 config.py CLOUDSQL_CONNECTION_NAME에 사용한 값과 같은 값으로 설정합니다. project:region:cloudsql-instance 형식이어야 합니다. 이 전체 줄의 주석 처리를 삭제합니다.
  3. app.yaml을 저장하고 닫습니다.

종속 항목 설치

다음 명령어를 입력하여 가상 환경을 만들고 종속 항목을 설치합니다.

Linux/macOS

virtualenv -p python3 env
source env/bin/activate
pip install -r requirements.txt

Windows

virtualenv -p python3 env
env\scripts\activate
pip install -r requirements.txt

로컬 머신에서 앱 실행:

  1. 작업자를 실행합니다.

    psqworker main.books_queue
    
  2. 같은 6-pubsub 디렉토리에서 다른 터미널을 열고 로컬 웹 서버를 시작합니다.

    source env/bin/activate
    python main.py
    
  3. 웹브라우저에서 다음 주소를 입력합니다.

    http://localhost:8080

이제 잘 알려진 도서를 Bookshelf에 추가합니다. 애플리케이션과 작업자 인스턴스가 모두 로컬에서 실행되는 경우, 작업자가 백그라운드에서 도서 정보를 업데이트하는 것을 볼 수 있습니다.

도서를 추가하기 전에 작업자를 적어도 한 번은 실행해야 합니다. 작업자는 이벤트를 수신 대기할 Cloud Pub/Sub 구독을 설정합니다. 구독을 설정하지 않으면 주제에 게시된 이벤트가 손실되고 Bookshelf 데이터에 대한 변경사항이 표시되지 않습니다. 구독이 설정되면 현재 이벤트를 수신 대기할 작업자가 없어도 이벤트가 대기열에 추가됩니다. 작업자가 온라인 상태가 되면 Cloud Pub/Sub에 의해 대기열에 추가된 이벤트가 전송됩니다.

작업자를 종료한 다음 로컬 웹 서버를 종료하려면 Control+C를 누르세요.

App Engine 가변형 환경에 앱 배포

  1. 작업자를 배포합니다.

    gcloud app deploy worker.yaml
    
  2. 샘플 앱을 배포합니다.

    gcloud app deploy
    
  3. 웹브라우저에서 다음 주소를 입력합니다. [YOUR_PROJECT_ID]를 프로젝트 ID로 바꿉니다.

    https://[YOUR_PROJECT_ID].appspot.com
    

앱을 업데이트하는 경우 앱을 처음 배포할 때 사용한 같은 명령어를 입력하여 업데이트된 버전을 배포할 수 있습니다. 새로 배포하면 앱의 새 버전을 만들고 기본 버전으로 승격합니다. 이전 버전의 앱은 연결된 VM 인스턴스와 마찬가지로 유지됩니다. 이러한 모든 앱 버전과 VM 인스턴스는 청구 가능한 리소스입니다.

기본 이외의 앱 버전을 삭제하여 비용을 줄일 수 있습니다.

앱 버전을 삭제하는 방법은 다음과 같습니다.

  1. GCP Console에서 App Engine 버전 페이지로 이동합니다.

    버전 페이지로 이동

  2. 삭제할 표준이 아닌 앱 버전 옆의 확인란을 클릭합니다.
  3. 페이지 상단의 삭제 버튼을 클릭하여 앱 버전을 삭제합니다.

청구 가능한 리소스 삭제에 대한 자세한 내용은 이 가이드의 마지막 단계에서 삭제를 참조하세요.

애플리케이션 구조

다음 다이어그램은 애플리케이션 구성요소 및 서로 연결되는 방식을 보여줍니다.

Cloud Pub/Sub 샘플 구조

애플리케이션은 도서가 데이터베이스에서 업데이트될 때마다 이벤트를 Cloud Pub/Sub에 게시합니다. 작업을 특정 대기열에 추가하여 게시하고, 별도로 실행되는 작업자가 이러한 이벤트를 수신 대기합니다. 이벤트가 수신되면 작업자가 Google Books API에 도서에 대한 정보를 요청하고 도서 기록을 데이터베이스에 업데이트합니다. 기록이 업데이트되면 도서 정보 페이지를 새로고침하여 새로운 정보를 볼 수 있습니다.

코드 이해하기

여기에서는 대기열을 만들고 작업을 대기열에 추가하며 작업자를 사용하여 작업을 처리하는 방법에 대해 설명합니다. 마지막으로 App Engine 가변형 환경에서 작업자 프로세스를 실행하는 방식을 보여줍니다.

이 샘플은 Google Cloud Pub/Sub를 사용하여 분산된 간단한 작업 대기열을 구현하는 Python 패키지인 psq를 사용합니다. psq는 의도적으로 rq Python 패키지와 유사합니다.

대기열 만들기

작업은 특정 대기열에 추가됩니다. 작업자는 실행할 작업에 이 대기열이 있는지 감시합니다. psq로 대기열을 만드는 방법은 다음과 같습니다.

publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()

def get_books_queue():
    project = current_app.config['PROJECT_ID']

    # Create a queue specifically for processing books and pass in the
    # Flask application context. This ensures that tasks will have access
    # to any extensions / configuration specified to the app, such as
    # models.
    return psq.Queue(
        publisher_client, subscriber_client, project,
        'books', extra_context=current_app.app_context)

celery 또는 rq를 사용하는 경우 구성이 유사합니다.

대기열에 작업 추가

작업은 프런트 엔드 애플리케이션 대신 작업자가 실행하는 Python 함수입니다. Bookshelf 애플리케이션은 도서가 만들어지거나 업데이트될 때마다 도서를 처리하기 위해 작업을 대기열에 추가합니다.

q = tasks.get_books_queue()
q.enqueue(tasks.process_book, book['id'])

rqcelery와 같은 다른 작업 대기열도 대기열 추가 메커니즘이 유사합니다. process_book의 구현에 대해서는 이 항목의 뒷부분에 나오는 도서 처리 섹션에서 다룹니다.

작업자

작업자는 pub/sub 이벤트를 수신 대기하는 별도의 애플리케이션입니다. 이로 인해 애플리케이션이 서로 다른 2개의 서비스로 나눠지며, 두 서비스가 서로 직접 통신하는 것이 아니라 pub/sub를 사용하여 통신하게 됩니다. 서비스를 구분하면 프런트 엔드 인스턴스의 수와 작업자 인스턴스의 수를 별도로 구성하고 확장할 수 있습니다.

작업자 실행

psq 패키지에는 독립형 작업자 psqworker가 포함됩니다. 이 작업자는 다른 작업을 차단하거나 시간이 오래 걸리는 작업을 기본 실행 프로세스에서 수행하지 않도록 백그라운드에서 실행되는 Python 프로세스입니다. 이 작업자는 rqrqworker와 유사합니다.

psqworker main.books_queue

이와 같이 psqworker를 시작하면 main.books_queue에 의해 지정된 대기열에 대한 구독이 자동으로 만들어집니다. 이벤트가 대기열에 작업으로 추가되어 pub/sub에 게시되면 psqworker가 자동으로 작업을 대기열에서 제거하고 실행합니다.

psqworker가 대기열을 가져올 수 있어야 하므로, 대기열을 모듈 수준의 가져오기로 사용할 수 있도록 만들어야 합니다.

# Make the queue available at the top-level, this allows you to run
# `psqworker main.books_queue`. We have to use the app's context because
# it contains all the configuration for plugins.
# If you were using another task queue, such as celery or rq, you can use this
# section to configure your queues to work with Flask.
with app.app_context():
    books_queue = bookshelf.tasks.get_books_queue()

도서 처리

도서를 처리하기 위해 작업은 ID를 기준으로 도서를 가져오고 추가 정보를 찾은 다음 업데이트된 정보를 다시 데이터베이스에 저장합니다.

def process_book(book_id):
    """
    Handles an individual Bookshelf message by looking it up in the
    model, querying the Google Books API, and updating the book in the model
    with the info found in the Books API.
    """

    model = get_model()

    book = model.read(book_id)

    if not book:
        logging.warn("Could not find book with id {}".format(book_id))
        return

    if 'title' not in book:
        logging.warn("Can't process book id {} without a title."
                     .format(book_id))
        return

    logging.info("Looking up book with title {}".format(book[
                                                        'title']))

    new_book_data = query_books_api(book['title'])

    if not new_book_data:
        return

    book['title'] = new_book_data.get('title')
    book['author'] = ', '.join(new_book_data.get('authors', []))
    book['publishedDate'] = new_book_data.get('publishedDate')
    book['description'] = new_book_data.get('description')

    # If the new book data has thumbnail images and there isn't currently a
    # thumbnail for the book, then copy the image to cloud storage and update
    # the book data.
    if not book.get('imageUrl') and 'imageLinks' in new_book_data:
        new_img_src = new_book_data['imageLinks']['smallThumbnail']
        book['imageUrl'] = download_and_upload_image(
            new_img_src,
            "{}.jpg".format(book['title']))

    model.update(book, book_id)

작업은 도서의 ID만 필요합니다. 데이터는 작업자가 작업을 처리하는 동안 업데이트될 수 있으므로 도서 인스턴스 대신 도서의 ID를 제공하는 것이 좋습니다.

또한 작업자는 Flask 애플리케이션 내부에서 실행되지 않아도 bookshelf.get_model()을 호출할 수 있습니다. 대기열이 get_books_queue에서 구성될 때 애플리케이션 컨텍스트가 모든 작업에 제공됩니다.

query_books_api 함수는 Google Books API 호출을 처리하여 도서에 대한 더 많은 정보를 가져옵니다.

def query_books_api(title):
    """
    Queries the Google Books API to find detailed information about the book
    with the given title.
    """
    r = requests.get('https://www.googleapis.com/books/v1/volumes', params={
        'q': title
    })

    try:
        data = r.json()['items'][0]['volumeInfo']
        return data

    except KeyError:
        logging.info("No book found for title {}".format(title))
        return None

    except ValueError:
        logging.info("Unexpected response from books API: {}".format(r))
        return None

def download_and_upload_image(src, dst_filename):
    """
    Downloads an image file and then uploads it to Google Cloud Storage,
    essentially re-hosting the image in GCS. Returns the public URL of the
    image in GCS
    """
    r = requests.get(src)

    if not r.status_code == 200:
        return

    return storage.upload_file(
        r.content,
        dst_filename,
        r.headers.get('content-type', 'image/jpeg'))

Google Cloud Platform에서 실행

작업자는 같은 애플리케이션 내에서 별도의 서비스로 배포됩니다. App Engine 애플리케이션에는 독립적인 서비스를 여러 개 포함할 수 있습니다. 즉, 애플리케이션의 부분을 쉽고 독립적으로 배포, 구성, 확장, 업데이트할 수 있습니다. 프런트 엔드를 기본 서비스에 배포하고 작업자를 작업자 서비스에 배포합니다.

작업자가 사용자에게 웹 요청을 제공하지 않거나 웹 애플리케이션을 실행하지 않더라도, 가변형 환경에서 실행될 때 HTTP 상태 확인을 제공하여 서비스가 실행 중이고 응답하는지 확인하는 것이 좋습니다. 그러나 상태 확인을 사용 중지하는 것도 가능합니다.

상태 확인을 제공하기 위해 작업자는 하나가 아니라 두 개의 프로세스를 시작합니다. 첫 번째 프로세스는 psqworker이고 다른 프로세스는 psqworker 프로세스가 실행 중이고 상태 확인에 응답하는지를 확인하는 간단한 Flask 애플리케이션입니다.

애플리케이션은 Foreman 프로세스 관리자의 Python 포트인 Honcho를 사용하여 여러 프로세스를 관리합니다. 해당 프로세스는 procfile로 구성됩니다.

bookshelf: gunicorn -b 0.0.0.0:$PORT main:app
worker: psqworker --pid /tmp/psq.pid main.books_queue
monitor: python monitor.py /tmp/psq.pid

이제 app.yamlentrypoint는 프로세스를 직접 실행하는 대신 honcho를 사용합니다.

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the default service, only the
# frontend process is needed.
entrypoint: honcho start -f /app/procfile bookshelf

procfile에는 프런트 엔드 bookshelf 애플리케이션의 항목도 포함됩니다. 기본 서비스(프런트 엔드)와 작업자 서비스가 정확히 동일한 코드베이스를 공유하므로, entrypoint에서 honcho로 전달되는 인수가 시작할 프로세스를 결정합니다. 다음 다이어그램에서는 왼쪽의 단일 서비스 배포와 오른쪽의 다중 서비스 배포를 대조하여 보여줍니다.

Cloud Pub/Sub 배포

bookshelf 프로세스와 worker 프로세스를 모두 실행하는 서비스를 만들 수도 있지만 별도로 관리하고 확장할 수 없게 됩니다.

작업자는 별도의 서비스이므로, 별도의 완전한 YAML 구성 파일이 필요합니다.

service: worker
runtime: python
env: flex

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the worker service, both the queue worker
# and the monitor process are needed.
entrypoint: honcho start -f /app/procfile worker monitor

runtime_config:
  python_version: 3

beta_settings:
    # If using Cloud SQL, uncomment and set this value to the Cloud SQL
    # connection name, e.g.
    #   "project:region:cloudsql-instance"
    # You must also update the values in config.py.
    #
    # cloud_sql_instances: "your-cloudsql-connection-name"

이 구성은 프런트 엔드에 사용되는 app.yaml 파일과 유사합니다. 주요 차이점은 service: workerentrypoint에서 honcho로 전달되는 인수입니다.

모니터 애플리케이션

모니터 애플리케이션은 psqworker 프로세스를 감시하고 상태 확인을 제공합니다. 이 모니터 애플리케이션은 rqworker, celery 작업자, redis 슬레이브와 같은 가변형 환경에서 대부분의 독립형 서버를 실행하는 데 손쉽게 재사용할 수 있습니다.

import os
import sys

from flask import Flask

# The app checks this file for the PID of the process to monitor.
PID_FILE = None

# Create app to handle health checks and monitor the queue worker. This will
# run alongside the worker, see procfile.
monitor_app = Flask(__name__)

# The health check reads the PID file created by psqworker and checks the proc
# filesystem to see if the worker is running. This same pattern can be used for
# rq and celery.
@monitor_app.route('/_ah/health')
def health():
    if not os.path.exists(PID_FILE):
        return 'Worker pid not found', 503

    with open(PID_FILE, 'r') as pidfile:
        pid = pidfile.read()

    if not os.path.exists('/proc/{}'.format(pid)):
        return 'Worker not running', 503

    return 'healthy', 200

@monitor_app.route('/')
def index():
    return health()

if __name__ == '__main__':
    PID_FILE = sys.argv[1]
    monitor_app.run('0.0.0.0', 8080)

삭제

이 가이드에서 사용한 리소스 비용이 Google Cloud Platform 계정에 청구되지 않도록 하는 방법은 다음과 같습니다.

프로젝트 삭제

비용이 청구되지 않도록 하는 가장 쉬운 방법은 가이드에서 만든 프로젝트를 삭제하는 것입니다.

프로젝트를 삭제하는 방법은 다음과 같습니다.

  1. GCP Console에서 프로젝트 페이지로 이동합니다.

    프로젝트 페이지로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.

기본 이외의 앱 버전 삭제

프로젝트를 삭제하지 않으려면 기본 이외의 앱 버전을 삭제하여 비용을 줄일 수 있습니다.

앱 버전을 삭제하는 방법은 다음과 같습니다.

  1. GCP Console에서 App Engine 버전 페이지로 이동합니다.

    버전 페이지로 이동

  2. 삭제할 표준이 아닌 앱 버전 옆의 확인란을 클릭합니다.
  3. 페이지 상단의 삭제 버튼을 클릭하여 앱 버전을 삭제합니다.

Cloud SQL 인스턴스 삭제

Cloud SQL 인스턴스를 삭제하는 방법은 다음과 같습니다.

  1. GCP Console에서 SQL 인스턴스 페이지로 이동합니다.

    SQL 인스턴스 페이지로 이동

  2. 다음의 이름을 클릭합니다. 삭제할 SQL 인스턴스
  3. 페이지 상단의 삭제 버튼을 클릭하여 인스턴스를 삭제합니다.

Cloud Storage 버킷 삭제

Cloud Storage 버킷을 삭제하는 방법은 다음과 같습니다.

  1. GCP Console에서 Cloud Storage 브라우저로 이동합니다.

    Cloud Storage 브라우저로 이동

  2. 삭제할 버킷 옆의 체크박스를 클릭합니다.
  3. 페이지 상단의 삭제 버튼을 클릭하여 버킷을 삭제합니다.

다음 단계

Compute Engine에서 Python Bookshelf 샘플 실행 방법 알아보기

다른 Google Cloud Platform 기능 직접 사용해 보기. 가이드를 살펴보세요.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...