将推送队列迁移到 Cloud Tasks (Java)

本页介绍如何将推送队列代码从任务队列迁移到 Cloud Tasks。Cloud Tasks 现在是使用 App Engine 推送队列的首选方式。

使用 Cloud Tasks 时访问的服务与使用任务队列 RPC API 时相同。这意味着您无需重新创建现有的推送队列和推送任务。但是,您必须迁移用于创建推送队列或推送任务或者用于与这两者交互的代码,才能使用 Cloud Tasks API。

您可以使用 Cloud Tasks REST 和 RPC API、Cloud Tasks 客户端库、Google Cloud CLI 和 Google Cloud 控制台创建推送队列和推送任务并与之交互。本页提供了使用 gcloud CLI 和 Cloud Tasks 客户端库的示例。

在 Cloud Tasks 中,所有队列均作为推送队列运行。在本指南的其余部分和 Cloud Tasks 文档中,“队列”和“推送队列”这两个术语含义相同。类似地,“任务”“推送任务”这两个术语含义也相同。

Cloud Tasks 不提供的功能

Cloud Tasks 不提供以下功能:

  • 对 Datastore 事务中的任务进行排队
  • 使用延迟任务库而非工作器服务
  • 在多租户应用中处理任务
  • 使用本地开发服务器进行模拟
  • 异步添加任务

价格和配额

将推送队列迁移到 Cloud Tasks 可能会影响应用的价格和配额。

价格

Cloud Tasks 有自己的价格。与任务队列一样,向 App Engine 应用发送带有任务的请求会导致应用产生费用。

配额

Cloud Tasks 配额不同于任务队列的配额。与任务队列一样,从 Cloud Tasks 向 App Engine 应用发送请求可能会影响 App Engine 请求配额

迁移之前

以下部分讨论了在将推送队列迁移到 Cloud Tasks 之前的设置步骤。

迁移拉取队列

在开始之前,请先迁移拉取队列,然后再按照本指南中的说明迁移推送队列。不建议在迁移推送队列后迁移拉取队列,因为必须使用 queue.yaml 文件这一要求可能会导致 Cloud Tasks 出现意外行为。

保护队列配置

一旦开始迁移到 Cloud Tasks,就不建议修改 queue.yaml 文件,因为这样做会导致意外行为。按照以下步骤保护您的队列配置,以免被 queue.yaml 文件修改。

  1. 配置 gcloud CLI,使之在将来的部署中省略 queue.yaml 文件。

    将您的 queue.yaml 文件添加到 .gcloudignore 文件。如需检查是否已有 .gcloudignore 文件,您可以在终端中从应用的顶级目录运行以下命令。如果该文件存在,此命令将输出文件名。

    ls -a | grep .gcloudignore

    如需详细了解 .gcloudignore 文件,请参阅 .gcloudignore 参考文档

  2. 限制对您的 queue.yaml 文件的权限。

    遵循保护队列配置指南中所述的最佳做法。

  3. 了解 Cloud Tasks 和 queue.yaml 文件(可选)。

    使用 Cloud Tasks API 管理队列配置时,部署 queue.yaml 文件会覆盖由 Cloud Tasks 设置的配置,这会导致意外行为。如需了解详情,请阅读使用队列管理与使用 queue.yaml 的对比

启用 Cloud Tasks API

如需启用 Cloud Tasks API,请点击 API 库中 Cloud Tasks API 上的启用。如果您看到的是管理按钮,而不是启用按钮,则说明您之前已为项目启用了 Cloud Tasks API,因而无需再次执行此操作。

对应用进行身份验证以访问 Cloud Tasks API

您必须对应用进行身份验证,然后才能访问 Cloud Tasks API。本部分介绍了针对两种不同使用场景的身份验证。

如需在本地开发或测试您的应用,建议您使用服务账号。如需了解如何设置服务账号并将其与应用关联,请参阅手动获取和提供服务账号凭据

如需在 App Engine 上部署应用,您不需要提供任何新的身份验证。应用默认凭据 (ADC) 会推断 App Engine 应用的身份验证详细信息。

下载 gcloud CLI

下载并安装 gcloud CLI 以便将 gcloud CLI 与 Cloud Tasks API 搭配使用(如果您之前未安装过)。如果您已经安装 gcloud CLI,请从终端运行以下命令。

gcloud components update

导入 Cloud 客户端库

如需将 Cloud Tasks 客户端库与 App Engine 应用搭配使用,请执行以下操作:

  1. pom.xml 文件中指定 Cloud Tasks 客户端库依赖项:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-tasks</artifactId>
      <version>1.3.0</version>
    </dependency>
  2. 导入负责创建任务并将其加入队列的文件中的 Cloud Tasks 客户端库依赖项:

    import com.google.cloud.tasks.v2.AppEngineHttpRequest;
    import com.google.cloud.tasks.v2.CloudTasksClient;
    import com.google.cloud.tasks.v2.HttpMethod;
    import com.google.cloud.tasks.v2.QueueName;
    import com.google.cloud.tasks.v2.Task;

创建和管理队列

本部分介绍如何使用 Cloud Tasks API 创建和管理队列。

使用 Cloud Tasks 时,您不需要使用 queue.yaml 文件来创建或管理队列,而应使用 Cloud Tasks API。不建议同时使用 queue.yaml 文件和 Cloud Tasks API,但这可能是从任务队列迁移到 Cloud Tasks 的必要步骤,具体取决于您的应用。请参阅使用队列管理与使用 queue.yaml 的对比,了解最佳做法。

创建队列

如果您的应用以编程方式创建队列,或者您要通过命令行创建其他队列,请阅读本部分。

在 Cloud Tasks 中,队列名称的格式为 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID。队列名称的 LOCATION_ID 部分对应于 Google Cloud 地区。队列名称的 QUEUE_ID 部分等同于任务队列的队列 name 字段。队列名称是在队列创建过程中根据您指定的项目、地区和 QUEUE_ID 生成的。

通常,队列位置(即区域)必须与应用的区域相同。此规则的两个例外情况适用于使用 europe-west 区域的应用和使用 us-central 区域的应用。在 Cloud Tasks 中,这些地区分别称为 europe-west1us-central1

您可以在创建队列的过程中指定可选的队列配置,但也可以在创建队列后对其进行更新。

您无需重新创建现有队列。请阅读本指南的相关部分,迁移与现有队列交互的代码。

重复使用队列名称

删除队列后,您必须等待 7 天才能在同一项目和位置(即区域)中创建具有相同队列 ID 的队列。

以下示例会使用 Cloud Tasks 创建两个队列。第一个队列的队列 ID 为 queue-blue,并且配置为以 5/s 的速率将所有任务发送到服务 task-module 的版本 v2。第二个队列的队列 ID 为 queue-red,并以 1/s 的速率分配任务。这两者均在 us-central1 位置中的项目(项目 ID 为 your-project-id)上创建。 这是与任务队列中的创建队列等效的 Cloud Tasks 功能。

gcloud

gcloud CLI 会根据 gcloud CLI 配置推断项目和位置。

gcloud tasks queues create queue-blue \
--max-dispatches-per-second=5 \
--routing-override=service:task-module,version:v2
gcloud tasks queues create queue-red \
--max-dispatches-per-second=1

客户端库

import com.google.cloud.tasks.v2.AppEngineRouting;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;

public class CreateQueue {
  public static void createQueue(
      String projectId, String locationId, String queueBlueName, String queueRedName)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueBlueName = "queue-blue";
      // String queueRedName = "queue-red";

      LocationName parent = LocationName.of(projectId, locationId);

      Queue queueBlue =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueBlueName).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(5.0))
              .setAppEngineRoutingOverride(
                  AppEngineRouting.newBuilder().setVersion("v2").setService("task-module"))
              .build();

      Queue queueRed =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueRedName).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .build();

      Queue[] queues = new Queue[] {queueBlue, queueRed};
      for (Queue queue : queues) {
        Queue response = client.createQueue(parent, queue);
        System.out.println(response);
      }
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档创建 Cloud Tasks 队列

设置队列处理速率

下表列出了任务队列与 Cloud Tasks 中的不同字段。

任务队列字段 Cloud Tasks 字段 说明
rate max_dispatches_per_second 从队列中分派任务的最大速率
max_concurrent_requests max_concurrent_dispatches 可从队列中分派的并发任务数上限
bucket_size max_burst_size

Cloud Tasks 会计算 get-only 属性 max_burst_size,用于根据 max_dispatches_per_second 的值限制队列中任务的处理速度。此字段允许队列具有较高的速率,以便任务在排入队列后很快便能予以处理,但如果短时间内有很多任务排入队列,则仍会限制资源使用率。

对于使用 queue.xml/yaml 文件创建或更新的 App Engine 队列,max_burst_size 最初等于 bucket_size。但是,如果稍后使用任何 Cloud Tasks 接口将队列传递到 update 命令,则无论 max_dispatches_per_second 是否已更新,系统都会根据 max_dispatches_per_second 的值重置 max_burst_size

total_storage_limit 在 Cloud Tasks 中已弃用 Cloud Tasks 不支持设置自定义存储限制

您可以在创建队列时设置队列处理速率,也可以在事后更新它。以下示例会使用 Cloud Tasks 为已创建的名为 queue-blue 的队列设置处理速率。如果 queue-blue 是使用 queue.yaml 文件创建或配置的,则以下示例会根据 max_dispatches_per_second20 重置 max_burst_size。这是与任务队列中的设置队列处理速率等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues update queue-blue \
--max-dispatches-per-second=20 \
--max-concurrent-dispatches=10

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;
import com.google.cloud.tasks.v2.UpdateQueueRequest;

public class UpdateQueue {
  public static void updateQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "queue-blue";

      LocationName parent = LocationName.of(projectId, locationId);

      Queue queueBlue =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueId).toString())
              .setRateLimits(
                  RateLimits.newBuilder()
                      .setMaxDispatchesPerSecond(20.0)
                      .setMaxConcurrentDispatches(10))
              .build();

      UpdateQueueRequest request = UpdateQueueRequest.newBuilder().setQueue(queueBlue).build();

      Queue response = client.updateQueue(request);
      System.out.println(response);
    }
  }
}

如需了解详情,请参阅定义速率限制

停用和恢复队列

Cloud Tasks 使用“暂停”这一术语的方式与任务队列使用“停用”这一术语的方式相同。暂停某个队列会使该队列中的任务停止执行,直到该队列恢复为止。但是,您可以继续将任务添加到已暂停的队列中。Cloud Tasks 使用“恢复”这一术语的方式与任务队列使用该术语的方式相同。

以下示例会暂停队列 ID 为 queueName 的队列。这是与任务队列中的停用队列等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues pause 

queueName

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class PauseQueue {
  public static void pauseQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.pauseQueue(queueName);
      System.out.println("Queue Paused.");
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档暂停队列

删除队列

删除队列后,您必须等待 7 天才能创建同名的队列。如果您无法等待 7 天,请考虑清除队列中的所有任务,并重新配置队列。

以下示例会删除队列 ID 为 foo 的队列。这是与任务队列中的删除队列等效的 Cloud Tasks 功能。

gcloud

gcloud tasks queues delete 

foo

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class DeleteQueue {
  public static void deleteQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.deleteQueue(queueName);
      System.out.println("Queue Deleted.");
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档删除队列

创建和管理任务

本部分介绍如何使用 Cloud Tasks API 创建和管理任务。

创建任务

下表列出了任务队列与 Cloud Tasks 中的不同字段。

任务队列字段 Cloud Tasks 字段 说明
Cloud Tasks 中的新功能 app_engine_http_request 创建一个针对 App Engine 服务发出的请求。这些任务称为 App Engine 任务。
method http_method 指定请求方法;例如 POST
url relative_uri 指定任务处理程序。注意最后一个字母的区别:i(统一资源标识符),而不是 l(统一资源定位符)
target app_engine_routing 可选。为 App Engine 任务指定 App Engine serviceversioninstance。如果未设置,则使用默认服务、版本和实例。

以下示例会创建一个任务,以路由到默认 App Engine 服务上的处理程序 /worker。这是与任务队列中的创建任务等效的 Cloud Tasks 功能。

gcloud

gcloud tasks create-app-engine-task --queue=default \
--method=POST --relative-uri=/worker?key=key

客户端库

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import java.nio.charset.Charset;

public class CreateTask {
  public static void createTask(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      String key = "key";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      // Construct the task body.
      Task taskParam =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker?key=" + key)
                      .setHttpMethod(HttpMethod.GET)
                      .build())
              .build();

      Task taskPayload =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setBody(ByteString.copyFrom(key, Charset.defaultCharset()))
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Send create task request.
      Task[] tasks = new Task[] {taskParam, taskPayload};
      for (Task task : tasks) {
        Task response = client.createTask(queueName, task);
        System.out.println(response);
      }
    }
  }
}

如需了解详情,请阅读 Cloud Tasks 参考文档创建 App Engine 任务

指定目标服务和路由

为 App Engine 任务指定 App Engine 目标服务、版本和实例是可选操作。默认情况下,App Engine 任务会路由到尝试执行该任务时的默认设置服务、版本和实例。

在创建任务期间设置该任务的 app_engine_routing 属性,为您的任务指定不同的 App Engine 服务、版本或实例。

如需将指定队列上的所有任务路由到同一 App Engine 服务、版本和实例,您可以在队列上设置 app_engine_routing_override 属性。

如需了解详情,请参阅 Cloud Tasks 参考文档配置路由

将数据传递给处理程序

与任务队列一样,您可以使用 Cloud Tasks 以两种方式将数据传递给处理程序。您可以在相对 URI 中将数据作为查询参数传递,也可以使用 HTTP 方法 POST 或 PUT 在请求正文中传递数据。

Cloud Tasks 使用“正文”这一术语的方式与任务队列使用“载荷”这一术语的方式相同。在 Cloud Tasks 中,默认正文内容类型是八位字节流而不是纯文本。您可以通过在标头中指定正文内容类型来设置它。

以下示例以两种不同的方式将键传递给处理程序 /worker 。这是与任务队列中的将数据传递给处理程序等效的 Cloud Tasks 功能。

控制台

gcloud tasks create-app-engine-task --queue=default --method=GET  \
--relative-uri=

/worker

?key=blue --routing=service:worker
gcloud tasks create-app-engine-task --queue=default --method=POST \
--relative-uri=

/worker

 --routing=service:worker \
--body-content="{'key': 'blue'}"

客户端库

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import java.nio.charset.Charset;

public class CreateTask {
  public static void createTask(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      String key = "key";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      // Construct the task body.
      Task taskParam =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker?key=" + key)
                      .setHttpMethod(HttpMethod.GET)
                      .build())
              .build();

      Task taskPayload =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setBody(ByteString.copyFrom(key, Charset.defaultCharset()))
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Send create task request.
      Task[] tasks = new Task[] {taskParam, taskPayload};
      for (Task task : tasks) {
        Task response = client.createTask(queueName, task);
        System.out.println(response);
      }
    }
  }
}

为任务命名

您可以视需要指定任务名称。如果您未指定任务名称,Cloud Tasks 会通过生成任务 ID 并根据您在任务创建期间指定的队列推理项目和位置(即区域)来为您构造任务名称。

任务名称的格式为 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID/tasks/TASK_ID。任务名称的 TASK_ID 部分相当于任务队列任务 name 字段。

重复使用任务名称

您必须等待一段时间才能重复使用任务名称。执行此操作之前必须等待的时间长短取决于是否在 Cloud Tasks 或任务队列中创建了分派任务的队列。

对于使用任务队列(包括默认队列)创建的队列上的任务,您必须在原始任务删除或执行后等待大约 9 天。对于使用 Cloud Tasks 创建的队列上的任务,您必须在原始任务删除或执行后等待大约 1 小时。

以下示例会创建一个将 TASK_ID 设置为 first-try 的任务,并将其添加到默认队列。这是与任务队列中的为任务命名等效的 Cloud Tasks 功能。

gcloud

gcloud CLI 通过根据配置推断项目和位置来构造任务名称。

gcloud tasks create-app-engine-task first-try --queue=default \
--method=GET --relative-uri=

/worker

客户端库

使用客户端库时,如果要指定 TASK_ID,则必须指定完整任务名称。项目和位置必须与添加任务的目标队列的项目和位置匹配。

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.cloud.tasks.v2.TaskName;

public class CreateTaskWithName {
  public static void createTaskWithName(
      String projectId, String locationId, String queueId, String taskId) throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      // String taskId = "first-try"

      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      Task.Builder taskBuilder =
          Task.newBuilder()
              .setName(TaskName.of(projectId, locationId, queueId, taskId).toString())
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.GET)
                      .build());

      // Send create task request.
      Task response = client.createTask(queueName, taskBuilder.build());
      System.out.println(response);
    }
  }
}

重试之前执行失败的任务

您可以在创建队列过程中在队列上设置任务重试配置,也可以通过更新队列来设置。下表列出了任务队列字段和相应的 Cloud Tasks 字段。

任务队列字段 Cloud Tasks 字段
task_retry_limit max_attempts
task_age_limit max_retry_duration
min_backoff_seconds min_backoff
max_backoff_seconds max_backoff
max_doublings max_doublings

任务特定的重试参数

在任务队列中配置的任务特定重试参数可在 Cloud Tasks 中使用,但您无法修改这些参数,也无法在新任务中设置这些参数。如需更改具有任务特定重试参数的任务的重试参数,请使用具有所需重试参数的 Cloud Tasks 队列重新创建该任务。

以下示例演示了各种重试方案:

  • fooqueue 中,从第一次尝试执行算起,任务最多重试七次,最长持续两天。两个限制都达到后,任务将永远失败。
  • barqueue 中,App Engine 会尝试重试任务,线性地增加每次重试之间的时间间隔,直到达到最大退避时间,然后以最大时间间隔无限次重试(因此,请求之间的时间间隔为 10 秒、20 秒、30 秒、…、190 秒、200 秒、200 秒、…)。
  • bazqueue 中,重试时间间隔从 10 秒开始,之后加倍三次,然后线性增加,最后以最大时间间隔无限期重试(因此,请求之间的时间间隔为 10 秒、20 秒、40 秒、80 秒、160 秒、240 秒、300 秒、300 秒、…)。

这是与任务队列中的重试任务等效的 Cloud Tasks 功能。

gcloud

设置指定秒数的选项时,必须在整数后加 s(例如 200s,而非 200)。

gcloud tasks queues create fooqueue \
--max-attempts=7 \
--max-retry-duration=172800s  #2*60*60*24 seconds in 2 days
gcloud tasks queues create barqueue \
--min-backoff=10s \
--max-backoff=200s \
--max-doublings=0
gcloud tasks queues create bazqueue \
--min-backoff=10s \
--max-backoff=300s \
--max-doublings=3

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;
import com.google.cloud.tasks.v2.RetryConfig;
import com.google.protobuf.Duration;

public class RetryTask {
  public static void retryTask(
      String projectId, String locationId, String fooqueue, String barqueue, String bazqueue)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String fooqueue = "fooqueue";
      // String barqueue = "barqueue";
      // String bazqueue = "bazqueue";

      LocationName parent = LocationName.of(projectId, locationId);

      Duration retryDuration = Duration.newBuilder().setSeconds(2 * 60 * 60 * 24).build();
      Duration min = Duration.newBuilder().setSeconds(10).build();
      Duration max1 = Duration.newBuilder().setSeconds(200).build();
      Duration max2 = Duration.newBuilder().setSeconds(300).build();

      Queue foo =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, fooqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder().setMaxAttempts(7).setMaxRetryDuration(retryDuration))
              .build();

      Queue bar =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, barqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder()
                      .setMinBackoff(min)
                      .setMaxBackoff(max1)
                      .setMaxDoublings(0))
              .build();

      Queue baz =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, bazqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder()
                      .setMinBackoff(min)
                      .setMaxBackoff(max2)
                      .setMaxDoublings(3))
              .build();

      Queue[] queues = new Queue[] {foo, bar, baz};
      for (Queue queue : queues) {
        Queue response = client.createQueue(parent, queue);
        System.out.println(response);
      }
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档设置重试参数

从队列中删除任务

删除任务时,如果任务位于使用 queue.yaml 文件创建的队列中,您必须等待 9 天才能创建具有相同名称的任务;如果任务位于使用 Cloud Tasks 创建的队列中,则必须等待 1 小时。

以下示例会从队列 ID 为 queue1 的队列中删除任务 ID 为 foo 的任务。这是与任务队列中的删除任务等效的 Cloud Tasks 功能。

gcloud

系统会根据 gcloud CLI 默认项目推断任务项目和位置。

gcloud tasks delete foo --queue=queue1

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.TaskName;

public class DeleteTask {
  public static void deleteTask(String projectId, String locationId, String queueId, String taskId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "queue1";
      // String taskId = "foo";

      // Construct the fully qualified queue name.
      String taskName = TaskName.of(projectId, locationId, queueId, taskId).toString();

      client.deleteTask(taskName);
      System.out.println("Task Deleted.");
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档从队列中删除任务

清除任务

以下示例会从队列 ID 为 foo 的队列中清除所有任务。这是与任务队列中的清除任务等效的 Cloud Tasks 功能。

gcloud

系统会根据 gcloud CLI 默认项目推断队列项目和位置。

gcloud tasks queues purge 

foo

客户端库

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class PurgeQueue {
  public static void purgeQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.purgeQueue(queueName);
      System.out.println("Queue Purged.");
    }
  }
}

如需了解详情,请参阅 Cloud Tasks 参考文档从队列中清除所有任务

Java 8 Cloud Tasks 示例

以下示例是使用 Cloud Tasks 创建队列并将任务加入队列的基本设置。它假定开发者已通过创建 pom.xml 文件来指定 Cloud Tasks 依赖项(如导入客户端库部分中所述)。这是与任务队列中的 Java 8 任务队列示例等效的 Cloud Tasks 功能。

负责创建任务并将其加入队列的文件会创建一个任务,并使用 Cloud Tasks 客户端库将该任务添加到默认队列中:

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(
    name = "TaskEnqueue",
    description = "Enqueue a task targeted at endpoint '/cloudtasks/worker'",
    urlPatterns = "/cloudtasks/enqueue")
public class Enqueue extends HttpServlet {

  // TODO(developer): Replace these variables before running the sample.
  static final String projectId = "my-project-id";
  static final String locationId = "us-central1";

  // Function creates Cloud Tasks from form submissions.
  protected void doPost(HttpServletRequest request, HttpServletResponse response)
      throws ServletException, IOException {
    String key = request.getParameter("key");

    try (CloudTasksClient client = CloudTasksClient.create()) {
      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, "default").toString();

      // Construct the task body.
      Task task =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/cloudtasks/worker?key=" + key)
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Add the task to the default queue.
      Task taskResponse = client.createTask(queueName, task);
      System.out.println("Task created: " + taskResponse.getName());
    }

    response.sendRedirect("/");
  }
}

定义工作器的文件会处理以下任务:

import java.io.IOException;
import java.util.logging.Logger;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(
    name = "TaskWorker",
    description = "Endpoint to process Cloud Task requests",
    urlPatterns = "/cloudtasks/worker"
)
public class Worker extends HttpServlet {

  private static final Logger log = Logger.getLogger(Worker.class.getName());

  // Worker function to process POST requests from Cloud Tasks targeted at the
  // '/cloudtasks/worker' endpoint.
  protected void doPost(HttpServletRequest request, HttpServletResponse response)
      throws ServletException, IOException {
    String key = request.getParameter("key");
    log.info("Worker is processing " + key);
  }
}

后续步骤