pull 태스크 임대

태스크가 pull 큐에 있으면 작업자가 태스크를 임대할 수 있습니다. 작업이 처리되면 작업자는 작업을 삭제해야 합니다.

시작하기 전에

중요 컨텍스트

  • 이 방법은 표준 환경의 서비스 내에서 실행 중인 작업자에만 적용됩니다.
  • pull 큐를 사용하는 경우 처리량에 따라 작업자의 규모를 직접 조정해야 합니다.

태스크 임대

태스크가 큐에 있으면 작업자는 lease_tasks() 메서드를 사용하여 큐에서 태스크를 한 개 이상 임대할 수 있습니다. add()를 사용하여 최근에 추가된 태스크가 lease_tasks()를 통해 제공되기까지 약간의 지연이 있을 수 있습니다.

임대 요청 시 임대할 작업 수(최대 1,000개 작업)와 임대 기간(초 단위, 최대 1주)을 지정합니다. 임대 기간이 만료되기 전에 가장 느린 태스크를 마칠 수 있을 만큼 임대 기간이 길어야 합니다. modify_task_lease()를 사용하여 태스크 임대를 수정할 수 있습니다.

태스크를 임대하면 다른 작업자가 이 태스크를 처리할 수 없으며 임대가 만료될 때까지 계속 사용할 수 없습니다.

lease_tasks() 메서드는 큐에서 임대된 태스크 목록이 포함된 Task 객체를 반환합니다.

다음 코드 샘플에서는 pull-queue 큐의 태스크 100개를 1시간 동안 임대합니다.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

태스크 태그 일괄 처리

여러 태스크를 서로 구분하기 위해 코드에서 태스크에 '태그'를 지정한 후 태그를 사용하여 임대할 태스크를 선택할 수 있습니다. 태그는 필터의 역할을 합니다.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

폴링 속도 조절

임대할 태스크의 큐를 폴링하는 작업자는 큐가 공급할 수 있는 것보다 더 빠르게 태스크를 임대하려 하는지 감지해야 합니다. 이러한 오류가 발생하면 lease_tasks()에서 다음과 같은 예외가 발생할 수 있습니다.

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


코드는 이러한 예외를 포착하여 lease_tasks() 호출을 보류한 후 나중에 다시 시도해야 합니다. 이 문제를 방지하려면 lease_tasks()를 호출할 때 더 높은 RPC 기한을 설정하는 것이 좋습니다. 또한 임대 요청이 빈 태스크 목록을 반환하는 경우에도 보류해야 합니다.

초당 LeaseTasks 요청을 10개 넘게 생성하면 처음 10개의 요청만 결과를 반환합니다. 요청이 이 한도를 초과하면 결과 없이 OK가 반환됩니다.

Google Cloud 콘솔에서 태스크 모니터링

애플리케이션의 모든 태스크와 큐에 대한 정보를 확인하는 방법은 다음과 같습니다.

  1. Google Cloud 콘솔에서 Cloud Tasks 페이지를 열고 유형 열에서 가져오기 값을 찾습니다.

    Cloud 작업으로 이동

  2. 원하는 큐 이름을 클릭하여 큐 세부정보 페이지를 엽니다. 선택한 큐의 모든 태스크가 표시됩니다.

태스크 삭제

작업자는 태스크를 완료한 후 큐에서 해당 태스크를 삭제해야 합니다. 작업자가 작업을 처리한 후에도 대기열에 작업이 남아 있다면 작업자가 실패했을 가능성이 높습니다. 이러한 경우 다른 작업자가 해당 작업을 처리합니다.

lease_task()에서 반환된 것과 같은 태스크 목록을 delete_tasks()에 전달하여 삭제할 수 있습니다.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

pull 큐의 엔드 투 엔드 예시

Python에서 pull 큐를 사용하는 간단하지만 완전한 엔드 투 엔드 예시는 appengine-pullqueue-counter를 참조하세요.