태스크가 pull 큐에 있으면 작업자가 태스크를 임대할 수 있습니다. 작업이 처리되면 작업자는 작업을 삭제해야 합니다.
시작하기 전에
중요 컨텍스트
- 이 방법은 표준 환경의 서비스 내에서 실행 중인 작업자에만 적용됩니다.
- pull 큐를 사용하는 경우 처리량에 따라 작업자의 규모를 직접 조정해야 합니다.
태스크 임대
태스크가 큐에 있으면 작업자는 lease_tasks()
메서드를 사용하여 큐에서 태스크를 한 개 이상 임대할 수 있습니다. add()
를 사용하여 최근에 추가된 태스크가 lease_tasks()
를 통해 제공되기까지 약간의 지연이 있을 수 있습니다.
임대 요청 시 임대할 작업 수(최대 1,000개 작업)와 임대 기간(초 단위, 최대 1주)을 지정합니다. 임대 기간이 만료되기 전에 가장 느린 태스크를 마칠 수 있을 만큼 임대 기간이 길어야 합니다. modify_task_lease()
를 사용하여 태스크 임대를 수정할 수 있습니다.
태스크를 임대하면 다른 작업자가 이 태스크를 처리할 수 없으며 임대가 만료될 때까지 계속 사용할 수 없습니다.
lease_tasks()
메서드는 큐에서 임대된 태스크 목록이 포함된 Task 객체를 반환합니다.
pull-queue
큐의 태스크 100개를 1시간 동안 임대합니다.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
태스크 태그 일괄 처리
여러 태스크를 서로 구분하기 위해 코드에서 태스크에 '태그'를 지정한 후 태그를 사용하여 임대할 태스크를 선택할 수 있습니다. 태그는 필터의 역할을 합니다.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
폴링 속도 조절
임대할 태스크의 큐를 폴링하는 작업자는 큐가 공급할 수 있는 것보다 더 빠르게 태스크를 임대하려 하는지 감지해야 합니다. 이러한 오류가 발생하면 lease_tasks()
에서 다음과 같은 예외가 발생할 수 있습니다.
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
코드는 이러한 예외를 포착하여 lease_tasks()
호출을 보류한 후 나중에 다시 시도해야 합니다. 이 문제를 방지하려면 lease_tasks()를 호출할 때 더 높은 RPC 기한을 설정하는 것이 좋습니다. 또한 임대 요청이 빈 태스크 목록을 반환하는 경우에도 보류해야 합니다.
초당 LeaseTasks 요청을 10개 넘게 생성하면 처음 10개의 요청만 결과를 반환합니다. 요청이 이 한도를 초과하면 결과 없이 OK
가 반환됩니다.
Google Cloud 콘솔에서 태스크 모니터링
애플리케이션의 모든 태스크와 큐에 대한 정보를 확인하는 방법은 다음과 같습니다.
Google Cloud 콘솔에서 Cloud Tasks 페이지를 열고 유형 열에서 가져오기 값을 찾습니다.
원하는 큐 이름을 클릭하여 큐 세부정보 페이지를 엽니다. 선택한 큐의 모든 태스크가 표시됩니다.
태스크 삭제
작업자는 태스크를 완료한 후 큐에서 해당 태스크를 삭제해야 합니다. 작업자가 작업을 처리한 후에도 대기열에 작업이 남아 있다면 작업자가 실패했을 가능성이 높습니다. 이러한 경우 다른 작업자가 해당 작업을 처리합니다.
lease_task()
에서 반환된 것과 같은 태스크 목록을 delete_tasks()
에 전달하여 삭제할 수 있습니다.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
pull 큐의 엔드 투 엔드 예시
Python에서 pull 큐를 사용하는 간단하지만 완전한 엔드 투 엔드 예시는 appengine-pullqueue-counter를 참조하세요.