将推送队列迁移到 Cloud Tasks

本页介绍如何将推送队列代码从任务队列迁移到 Cloud Tasks。Cloud Tasks 现在是使用 App Engine 推送队列的首选方式。

概览

使用 Cloud Tasks 时访问的服务与使用任务队列 RPC API 时相同。这意味着您无需重新创建现有的推送队列和推送任务。但是,您必须迁移用于创建推送队列或推送任务或者用于与这两者交互的代码,才能使用 Cloud Tasks API。

您可以使用 Cloud Tasks REST 和 RPC API、Cloud Tasks Python 客户端库、Google Cloud CLI 和 Google Cloud 控制台创建推送队列和推送任务并与之交互。本页提供了使用 gcloud CLI 和 Cloud Tasks Python 客户端库的示例。

在 Cloud Tasks 中,所有队列均作为推送队列运行。在本指南的其余部分和 Cloud Tasks 文档中,“队列”和“推送队列”这两个术语含义相同。类似地,“任务”“推送任务”这两个术语含义也相同。

Cloud Tasks 目前未提供的功能

Cloud Tasks 目前不提供以下功能:

  • 对 Datastore 事务中的任务进行排队
  • 使用延迟任务库而非工作器服务
  • 在多租户应用中处理任务
  • 使用本地开发服务器进行模拟
  • 异步添加任务

价格和配额

将推送队列迁移到 Cloud Tasks 可能会影响应用的价格和配额。

价格

Cloud Tasks 有自己的价格。与任务队列一样,向 App Engine 应用发送带有任务的请求会导致应用产生费用。

配额

Cloud Tasks 配额不同于任务队列的配额。与任务队列一样,从 Cloud Tasks 向 App Engine 应用发送请求可能会影响 App Engine 请求配额

迁移之前

本部分介绍在将推送队列迁移到 Cloud Tasks 之前需要执行的操作。

迁移拉取队列

在使用本指南迁移推送队列之前,请根据迁移拉取队列指南迁移拉取队列。不建议在迁移推送队列后迁移拉取队列,因为必须使用 queue.yaml 文件这一要求可能会导致 Cloud Tasks 出现意外行为。

保护队列配置

一旦开始迁移到 Cloud Tasks,就不建议修改 queue.yaml 文件,因为这样做会导致意外行为。按照以下步骤保护您的队列配置,以免被 queue.yaml 文件修改。

  1. 配置 gcloud CLI,使之在将来的部署中省略 queue.yaml 文件。

    将您的 queue.yaml 文件添加到 .gcloudignore 文件。如需检查是否已有 .gcloudignore 文件,您可以在终端中从应用的顶级目录运行以下命令。如果该文件存在,此命令将输出相应文件名。

    ls -a | grep .gcloudignore

    如需详细了解 .gcloudignore 文件,请参阅 .gcloudignore 参考文档

  2. 限制对您的 queue.yaml 文件的权限。

    遵循保护队列配置指南中所述的最佳做法。

  3. 了解 Cloud Tasks 和 queue.yaml 文件(可选)。

    使用 Cloud Tasks API 管理队列配置时,部署 queue.yaml 文件会覆盖由 Cloud Tasks 设置的配置,这会导致意外行为。如需了解详情,请阅读使用队列管理与使用 queue.yaml 的对比

启用 Cloud Tasks API

如需启用 Cloud Tasks API,请点击 API 库中 Cloud Tasks API 上的启用。如果您看到的是管理按钮,而不是启用按钮,则说明您之前已为项目启用了 Cloud Tasks API,因而无需再次执行此操作。

对应用进行身份验证以访问 Cloud Tasks API

您必须对应用进行身份验证,然后才能访问 Cloud Tasks API。本部分介绍了针对两种不同使用场景的身份验证。

如需在本地开发或测试您的应用,建议您使用服务帐号。如需获取设置服务帐号并将其与应用关联的说明,请参阅手动获取和提供服务帐号凭据

如需在 App Engine 上部署应用,您不需要提供任何新的身份验证。应用默认凭据 (ADC) 会推断 App Engine 应用的身份验证详细信息。

下载 gcloud CLI

下载并安装 gcloud CLI 以便将 gcloud CLI 与 Cloud Tasks API 搭配使用(如果您之前未安装过)。如果您已经安装 gcloud CLI,请从终端运行以下命令。

gcloud components update

导入 Python 客户端库

按照以下步骤将 Cloud Tasks Python 客户端库用于 App Engine 应用:

  1. 创建一个用于存储第三方库的目录,如 lib/

    mkdir lib
  2. 复制必要的库。

    我们建议您使用 pip 需求文件,而不是从命令行逐个安装这些库。如果您还没有 requirements.txt 文件,请在 app.yaml 文件所在的文件夹中创建一个 requirements.txt 文件。添加以下两行:

    google-cloud-tasks
    

    使用带有 -t <directory> 标志的 pip(版本 6 或更高版本)将您在 requirements.txt 文件中指定的 Cloud Tasks 库复制到您在上一步中创建的文件夹。 例如:

    pip install -t lib -r requirements.txt
    
  3. 在您的 app.yaml 文件的 libraries 部分中指定 RPC 和 setuptools 库:

    libraries:
    - name: grpcio
      version: 1.0.0
    - name: setuptools
      version: 36.6.0
    
  4. 使用 pkg_resources 模块来确保您的应用使用正确的 Cloud Tasks Python 客户端库。

    如果您还没有 appengine_config.py 文件,请在 app.yaml 文件所在的文件夹中创建此文件。将以下内容添加到 appengine_config.py 文件:

    # appengine_config.py
    import pkg_resources
    from google.appengine.ext import vendor
    
    # Set path to your libraries folder.
    path = 'lib'
    # Add libraries installed in the path folder.
    vendor.add(path)
    # Add libraries to pkg_resources working set to find the distribution.
    pkg_resources.working_set.add_entry(path)
    

    上方的 appengine_config.py 文件假设当前工作目录是 lib 文件夹所在的位置。在某些情况下,比如单元测试,当前工作目录可能有所不同。为避免出错,您可以使用以下命令,将完整路径明确传递到 lib 文件夹:

    import os
    path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'lib')
    
  5. 在使用 Task Queues API 中的推送队列的任何文件中导入 Cloud Tasks Python 客户端库:

    from google.cloud import tasks
    

    当您成功将创建了创建推送队列或与之交互的所有代码完整迁移至 Cloud Tasks 之后,请移除导入 Task Queues API 的语句,例如 from google.appengine.api import taskqueue

创建和管理队列

本部分介绍如何使用 Cloud Tasks API 创建和管理队列。

使用 Cloud Tasks 时,您不需要使用 queue.yaml 文件来创建或管理队列,而应使用 Cloud Tasks API。不建议同时使用 queue.yaml 文件和 Cloud Tasks API,但这可能是从任务队列迁移到 Cloud Tasks 的必要步骤,具体取决于您的应用。请参阅使用队列管理与使用 queue.yaml 的对比,了解最佳做法。

创建队列

如果您的应用以编程方式创建队列,或者您要通过命令行创建其他队列,请阅读本部分。

在 Cloud Tasks 中,队列名称的格式为 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID。队列名称的 LOCATION_ID 部分对应于 Google Cloud 地区。队列名称的 QUEUE_ID 部分等同于任务队列的队列 name 字段。队列名称是在队列创建过程中根据您指定的项目、地区和 QUEUE_ID 生成的。

通常,队列位置(即地区)必须与应用的地区相同。此规则的两个例外情况适用于使用 europe-west 地区的应用和使用 us-central 地区的应用。在 Cloud Tasks 中,这些地区分别称为 europe-west1us-central1

您可以在创建队列的过程中指定可选的队列配置,但也可以在创建队列后对其进行更新。

您无需重新创建现有队列。请阅读本指南的相关部分,迁移与现有队列交互的代码。

重复使用队列名称

删除队列后,您必须等待 7 天才能在同一项目和位置(即地区)中创建具有相同队列 ID 的队列。

以下示例会使用 Cloud Tasks 创建两个队列。第一个队列的队列 ID 为 queue-blue,并且配置为以 5/s 的速率将所有任务发送到服务 task-module 的版本 v2。第二个队列的队列 ID 为 queue-red,并以 1/s 的速率分配任务。这两者都是在 us-central1 位置中的项目(项目 ID 为 my-project-id)上创建。 这是与任务队列中的创建队列等效的 Cloud Tasks 功能。

gcloud

gcloud CLI 会根据 gcloud CLI 配置推断项目和位置。

gcloud tasks queues create queue-blue \
--max-dispatches-per-second=5 \
--routing-override=service:task-module,version:v2
gcloud tasks queues create queue-red \
--max-dispatches-per-second=1

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue_blue_name = 'queue-blue'
# queue_red_name = 'queue-red'

parent = f"projects/{project}/locations/{location}"

queue_blue = {
    'name': client.queue_path(project, location, queue_blue_name),
    'rate_limits': {
        'max_dispatches_per_second': 5
    },
    'app_engine_routing_override': {
        'version': 'v2',
        'service': 'task-module'
    }
}

queue_red = {
    'name': client.queue_path(project, location, queue_red_name),
    'rate_limits': {
        'max_dispatches_per_second': 1
    }
}

queues = [queue_blue, queue_red]
for queue in queues:
    response = client.create_queue(parent=parent, queue=queue)
    print(response)

如需了解详情,请参阅 Cloud Tasks 参考文档创建 Cloud Tasks 队列

设置队列处理速率

下表列出了任务队列字段与 Cloud Tasks 字段的不同之处。

任务队列字段 Cloud Tasks 字段 说明
rate max_dispatches_per_second 从队列中分派任务的最大速率
max_concurrent_requests max_concurrent_dispatches 可从队列中分派的并发任务数上限
bucket_size max_burst_size

Cloud Tasks 会计算 get-only 属性 max_burst_size,用于根据 max_dispatches_per_second 的值限制队列中任务的处理速度。此字段允许队列具有较高的速率,以便任务在排入队列后很快便能予以处理,但如果短时间内有很多任务排入队列,则仍会限制资源使用率。

对于使用 queue.yaml 文件创建或更新的 App Engine 队列,max_burst_size 最初等于 bucket_size。但是,如果稍后使用任何 Cloud Tasks 接口将队列传递到 update 命令,则无论 max_dispatches_per_second 是否已更新,系统都会根据 max_dispatches_per_second 的值重置 max_burst_size

total_storage_limit 在 Cloud Tasks 中已弃用 Cloud Tasks 目前不支持设置自定义存储限制

您可以在创建队列时设置队列处理速率,也可以在事后更新它。以下示例会使用 Cloud Tasks 为已创建的名为 queue-blue 的队列设置处理速率。如果 queue-blue 是使用 queue.yaml 文件创建或配置的,则以下示例会根据 20max_dispatches_per_second 值重置 max_burst_size。这是与任务队列中的设置队列处理速率等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues update queue-blue \
--max-dispatches-per-second=20 \
--max-concurrent-dispatches=10

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'queue-blue'

# Get queue object
queue_path = client.queue_path(project, location, queue)
queue = client.get_queue(name=queue_path)

# Update queue object
queue.rate_limits.max_dispatches_per_second = 20
queue.rate_limits.max_concurrent_dispatches = 10

response = client.update_queue(queue=queue)
print(response)

如需了解详情,请参阅定义速率限制

停用和恢复队列

Cloud Tasks 使用“暂停”这一术语的方式与任务队列使用“停用”这一术语的方式相同。暂停某个队列会使该队列中的任务停止执行,直到该队列恢复为止。但是,您可以继续将任务添加到已暂停的队列中。Cloud Tasks 使用“恢复”这一术语的方式与任务队列使用该术语的方式相同。

以下示例会暂停队列 ID 为 queue1 的队列。这是与任务队列中的停用队列等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues pause queue1

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'queue1'

queue_path = client.queue_path(project, location, queue)
response = client.pause_queue(name=queue_path)

如需了解详情,请参阅 Cloud Tasks 参考文档暂停队列

删除队列

删除队列后,您必须等待 7 天才能创建同名的队列。如果您无法等待 7 天,请考虑清除队列中的所有任务,并重新配置队列。

以下示例会删除队列 ID 为 queue1 的队列。这是与任务队列中的删除队列等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues delete queue1

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'queue1'

queue_path = client.queue_path(project, location, queue)
response = client.delete_queue(name=queue_path)

如需了解详情,请参阅 Cloud Tasks 参考文档删除队列

创建和管理任务

本部分介绍如何使用 Cloud Tasks API 创建和管理任务。

创建任务

下表列出了任务队列字段与 Cloud Tasks 字段的不同之处。

任务队列字段 Cloud Tasks 字段 说明
Cloud Tasks 中的新功能 app_engine_http_request 创建一个针对 App Engine 服务发出的请求。这些任务称为 App Engine 任务。
method http_method 指定请求方法,例如 POST
url relative_uri 指定任务处理程序。注意最后一个字母的区别:i(统一资源标识符),而不是 l(统一资源定位符)
target app_engine_routing 可选。为 App Engine 任务指定 App Engine serviceversioninstance。如果未设置,则使用默认服务、版本和实例。

以下示例创建一个任务,该任务通过 /update_counter 处理程序路由到名为 worker 的 App Engine 服务。这是与任务队列中的创建任务等效的 Cloud Tasks 功能。

gcloud

gcloud tasks create-app-engine-task --queue=default \
--method=POST --relative-uri=/update_counter --routing=service:worker \
--body-content=10

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'default'
amount = 10

parent = client.queue_path(project, location, queue)

task = {
    'app_engine_http_request': {
        'http_method': tasks.HttpMethod.POST,
        'relative_uri': '/update_counter',
        'app_engine_routing': {
            'service': 'worker'
        },
        'body': str(amount).encode()
    }
}

response = client.create_task(parent=parent, task=task)
eta = response.schedule_time.strftime("%m/%d/%Y, %H:%M:%S")
print('Task {} enqueued, ETA {}.'.format(response.name, eta))

如需了解详情,请阅读 Cloud Tasks 参考文档创建 App Engine 任务

指定目标服务和路由

为 App Engine 任务指定 App Engine 目标服务、版本和实例是可选操作。默认情况下,App Engine 任务会路由到尝试执行该任务时的默认设置服务、版本和实例。

在创建任务期间设置该任务的 app_engine_routing 属性,为您的任务指定不同的 App Engine 服务、版本或实例。

如需将指定队列上的所有任务路由到同一 App Engine 服务、版本和实例,您可以在队列上设置 app_engine_routing_override 属性。

如需了解详情,请参阅 Cloud Tasks 参考文档配置路由

将数据传递给处理程序

与任务队列一样,您可以使用 Cloud Tasks 以两种方式将数据传递给处理程序。您可以在相对 URI 中将数据作为查询参数传递,也可以使用 HTTP 方法 POST 或 PUT 在请求正文中传递数据。

Cloud Tasks 使用“正文”这一术语的方式与任务队列使用“载荷”这一术语的方式相同。在 Cloud Tasks 中,默认正文内容类型是八位字节流而不是纯文本。您可以通过在标头中指定正文内容类型来设置它。

以下示例以两种不同的方式将键传递给处理程序 /update_counter。这是与任务队列中的将数据传递给处理程序等效的 Cloud Tasks 功能。

控制台

gcloud tasks create-app-engine-task --queue=default --method=GET  \
--relative-uri=/update_counter?key=blue --routing=service:worker
gcloud tasks create-app-engine-task --queue=default --method=POST \
--relative-uri=/update_counter --routing=service:worker \
--body-content="{'key': 'blue'}"

客户端库

import json
client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'default'

parent = client.queue_path(project, location, queue)

task1 = {
    'app_engine_http_request': {
        'http_method': tasks.HttpMethod.POST,
        'relative_uri': '/update_counter?key=blue',
        'app_engine_routing': {
            'service': 'worker'
        }
    }
}

task2 = {
    'app_engine_http_request': {
        'http_method': tasks.HttpMethod.POST,
        'relative_uri': '/update_counter',
        'app_engine_routing': {
            'service': 'worker'
        },
        'headers': {
            'Content-Type': 'application/json'
        },
        'body': json.dumps({'key': 'blue'}).encode()
    }
}

response = client.create_task(parent=parent, task=task1)
print(response)
response = client.create_task(parent=parent, task=task2)
print(response)

为任务命名

您可以视需要指定任务名称。如果您未指定任务名称,Cloud Tasks 会通过生成任务 ID 并根据您在任务创建过程中指定的队列推断项目和位置(即地区)来为您构建任务名称。

任务名称的格式为 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID/tasks/TASK_ID。任务名称的 TASK_ID 部分相当于任务队列任务 name 字段。

重复使用任务名称

您必须等待一段时间才能重复使用任务名称。执行此操作之前必须等待的时间长短取决于是否在 Cloud Tasks 或任务队列中创建了分派任务的队列。

对于使用任务队列(包括默认队列)创建的队列上的任务,您必须在原始任务删除或执行后等待大约 9 天。对于使用 Cloud Tasks 创建的队列上的任务,您必须在原始任务删除或执行后等待大约 1 小时。

以下示例会创建一个将 TASK_ID 设置为 first-try 的任务,并将其添加到默认队列。这是与任务队列中的为任务命名等效的 Cloud Tasks 功能。

gcloud

gcloud CLI 通过根据配置推断项目和位置来构造任务名称。

gcloud tasks create-app-engine-task first-try --queue=default \
--method=GET --relative-uri=/url/path

客户端库

使用客户端库时,如果要指定 TASK_ID,则必须指定完整任务名称。项目和位置必须与添加任务的目标队列的项目和位置匹配。

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'default'
# task_name = 'first-try'

parent = client.queue_path(project, location, queue)

task = {
    'name': client.task_path(project, location, queue, task_name),
    'app_engine_http_request': {
        'http_method': tasks.HttpMethod.GET,
        'relative_uri': '/url/path'
    }
}
response = client.create_task(parent=parent, task=task)
print(response)

重试之前执行失败的任务

您可以在创建队列过程中在队列上设置任务重试配置,也可以通过更新队列来设置。下表列出了任务队列字段和相应的 Cloud Tasks 字段。

任务队列字段 Cloud Tasks 字段
task_retry_limit max_attempts
task_age_limit max_retry_duration
min_backoff_seconds min_backoff
max_backoff_seconds max_backoff
max_doublings max_doublings

任务特定的重试参数

在任务队列中配置的任务特定重试参数可在 Cloud Tasks 中使用,但您无法修改这些参数,也无法在新任务中设置这些参数。如需更改具有任务特定重试参数的任务的重试参数,请使用具有所需重试参数的 Cloud Tasks 队列重新创建该任务。

以下示例演示了各种重试方案:

  • fooqueue 中,从第一次尝试执行算起,任务最多重试七次,最长持续两天。两个限制都达到后,任务将永远失败。
  • barqueue 中,App Engine 会尝试重试任务,线性地增加每次重试之间的时间间隔,直到达到最大退避时间,然后以最大时间间隔无限次重试(因此,请求之间的时间间隔为 10 秒、20 秒、30 秒、…、190 秒、200 秒、200 秒、…)。
  • bazqueue 中,重试时间间隔从 10 秒开始,之后加倍三次,然后线性增加,最后以最大时间间隔无限期重试(因此,请求之间的时间间隔为 10 秒、20 秒、40 秒、80 秒、160 秒、240 秒、300 秒、300 秒、…)。

这是与任务队列中的重试任务等效的 Cloud Tasks 功能。

gcloud

设置指定秒数的选项时,必须在整数后加 s(例如 200s,而非 200)。

gcloud tasks queues create fooqueue \
--max-attempts=7 \
--max-retry-duration=172800s  #2*60*60*24 seconds in 2 days
gcloud tasks queues create barqueue \
--min-backoff=10s \
--max-backoff=200s \
--max-doublings=0
gcloud tasks queues create bazqueue \
--min-backoff=10s \
--max-backoff=300s \
--max-doublings=3

客户端库

from google.protobuf import duration_pb2

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# fooqueue = 'fooqueue'
# barqueue = 'barqueue'
# bazqueue = 'bazqueue'

parent = f"projects/{project}/locations/{location}"

max_retry = duration_pb2.Duration()
max_retry.seconds = 2*60*60*24

foo = {
    'name': client.queue_path(project, location, fooqueue),
    'rate_limits': {
        'max_dispatches_per_second': 1
    },
    'retry_config': {
        'max_attempts': 7,
        'max_retry_duration': max_retry
    }
}

min = duration_pb2.Duration()
min.seconds = 10

max = duration_pb2.Duration()
max.seconds = 200

bar = {
    'name': client.queue_path(project, location, barqueue),
    'rate_limits': {
        'max_dispatches_per_second': 1
    },
    'retry_config': {
        'min_backoff': min,
        'max_backoff': max,
        'max_doublings': 0
    }
}

max.seconds = 300
baz = {
    'name': client.queue_path(project, location, bazqueue),
    'rate_limits': {
        'max_dispatches_per_second': 1
    },
    'retry_config': {
        'min_backoff': min,
        'max_backoff': max,
        'max_doublings': 3
    }
}

queues = [foo, bar, baz]
for queue in queues:
    response = client.create_queue(parent=parent, queue=queue)
    print(response)

如需了解详情,请参阅 Cloud Tasks 参考文档设置重试参数

从队列中删除任务

删除任务时,如果任务位于使用 queue.yaml 文件创建的队列中,您必须等待 9 天才能创建具有相同名称的任务;如果任务位于使用 Cloud Tasks 创建的队列中,则必须等待 1 小时。

以下示例会从队列 ID 为 queue1 的队列中删除任务 ID 为 foo 的任务。这是与任务队列中的删除任务等效的 Cloud Tasks 功能。

gcloud

系统会根据 gcloud CLI 默认项目推断任务项目和位置。

gcloud tasks delete foo --queue=queue1

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'queue1'

task_path = client.task_path(project, location, queue, 'foo')
response = client.delete_task(name=task_path)

如需了解详情,请参阅 Cloud Tasks 参考文档从队列中删除任务

清除任务

以下示例会从队列 ID 为 queue1 的队列中清除所有任务。这是与任务队列中的清除任务等效的 Cloud Tasks 功能。

gcloud

系统会根据 gcloud CLI 默认项目推断队列项目和位置。

gcloud tasks queues purge queue1

客户端库

client = tasks.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# location = 'us- central1'
# queue = 'queue1'

queue_path = client.queue_path(project, location, queue)
response = client.purge_queue(name=queue_path)

如需了解详情,请参阅 Cloud Tasks 参考文档从队列中清除所有任务

后续步骤