將發送佇列遷移至 Cloud Tasks (Java)

本頁說明如何將發送佇列程式碼從工作佇列遷移至 Cloud Tasks。現在使用 Cloud Tasks 處理 App Engine 推送佇列是較好的做法。

有了 Cloud Tasks,您就能使用透過 Task Queues RPC API 存取的服務。也就是說,您不必重新建立現有的推送佇列和推送工作。不過,您必須遷移建立或與推送佇列/工作互動的程式碼,才能使用 Cloud Tasks API。

您可以使用 Cloud Tasks REST 和 RPC API、Cloud Tasks 用戶端程式庫、Google Cloud CLI 和 Google Cloud 控制台,建立及與推送佇列和推送工作互動。本頁面提供使用 gcloud CLI 和 Cloud Tasks 用戶端程式庫的範例。

在 Cloud Tasks 中,所有佇列都是推送佇列。在本指南的其餘部分和 Cloud Tasks 說明文件中,「佇列」一詞等同於「推送佇列」。同樣地,字詞「工作」等同於字詞「推送工作」

Cloud Tasks 不支援的功能

Cloud Tasks 不支援下列功能:

  • 在 Datastore 交易中將工作排入佇列
  • 使用延後工作程式庫,而非工作站服務
  • 在多用戶群應用程式中處理工作
  • 使用本機開發伺服器模擬
  • 非同步新增工作

定價與配額

將發送佇列遷移至 Cloud Tasks 可能會影響應用程式的價格和配額。

定價

Cloud Tasks 有專屬的定價。與 Task Queues 相同,使用工作向 App Engine 應用程式傳送要求時,應用程式可能會產生費用。

配額

Cloud Tasks 配額與工作佇列配額不同。與工作佇列一樣,從 Cloud Tasks 將要求傳送至 App Engine 應用程式,可能會影響 App Engine 的要求配額

遷移前

以下各節將說明將推送佇列遷移至 Cloud Tasks 前的設定步驟。

遷移提取佇列

開始之前,請先遷移提取佇列,再按照本指南中的說明遷移推送佇列。不建議在遷移發送佇列後遷移提取佇列,因為必須使用 queue.yaml 檔案,可能會導致 Cloud Tasks 發生非預期行為。

保護佇列設定

開始遷移至 Cloud Tasks 後,修改 queue.yaml 檔案可能會導致非預期的行為,因此不建議這麼做。請按照下列步驟操作,防止 queue.yaml 檔案修改佇列設定。

  1. 設定 gcloud CLI,在日後的部署作業中省略 queue.yaml 檔案。

    queue.yaml 檔案新增至 .gcloudignore 檔案。如要檢查是否已有 .gcloudignore 檔案,請在應用程式的頂層目錄中,於終端機執行下列指令。如果檔案存在,這項指令會輸出檔案名稱。

    ls -a | grep .gcloudignore

    如要進一步瞭解 .gcloudignore 檔案,請參閱.gcloudignore參考資料

  2. 限制 queue.yaml 檔案的權限。

    請按照「確保佇列設定安全無虞」指南中說明的最佳做法操作。

  3. 瞭解 Cloud Tasks 和 queue.yaml 檔案 (選用)。

    使用 Cloud Tasks API 管理佇列設定時,部署 queue.yaml 檔案會覆寫 Cloud Tasks 設定的設定,可能導致非預期行為。如要瞭解詳情,請參閱「使用佇列管理功能和 queue.yaml 的差異」。

啟用 Cloud Tasks API

如要啟用 Cloud Tasks API,請在 API 程式庫中,點按 Cloud Tasks API 上的「啟用」。如果看到「管理」按鈕,而非「啟用」按鈕,表示您先前已為專案啟用 Cloud Tasks API,因此不需要再次啟用。

向 Cloud Tasks API 驗證應用程式

您必須向 Cloud Tasks API 驗證應用程式。本節將討論兩種不同用途的驗證。

如要在本機開發或測試應用程式,建議使用服務帳戶。如需設定服務帳戶並連結至應用程式的操作說明,請參閱手動取得及提供服務帳戶憑證

如要在 App Engine 上部署應用程式,您不需要提供任何新的驗證資訊。應用程式預設憑證 (ADC) 會推斷 App Engine 應用程式的驗證詳細資料。

下載 gcloud CLI

如要搭配使用 gcloud CLI 與 Cloud Tasks API,請下載並安裝 gcloud CLI (如果您先前未安裝)。如果已安裝 gcloud CLI,請從終端機執行下列指令。

gcloud components update

匯入 Cloud 用戶端程式庫

如要搭配 App Engine 應用程式使用 Cloud Tasks 用戶端程式庫,請按照下列步驟操作:

  1. pom.xml 檔案中指定 Cloud Tasks 用戶端程式庫依附元件:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-tasks</artifactId>
      <version>1.3.0</version>
    </dependency>
  2. 在負責建立及將工作加入佇列的檔案中,匯入 Cloud Tasks 用戶端程式庫依附元件:

    import com.google.cloud.tasks.v2.AppEngineHttpRequest;
    import com.google.cloud.tasks.v2.CloudTasksClient;
    import com.google.cloud.tasks.v2.HttpMethod;
    import com.google.cloud.tasks.v2.QueueName;
    import com.google.cloud.tasks.v2.Task;

建立及管理佇列

本節說明如何使用 Cloud Tasks API 建立及管理佇列。

使用 Cloud Tasks 時,您不需要使用 queue.yaml 檔案建立或管理佇列。請改用 Cloud Tasks API。不建議同時使用 queue.yaml 檔案和 Cloud Tasks API,但視應用程式而定,這可能是從工作佇列遷移至 Cloud Tasks 時無可避免的步驟。請參閱「使用佇列管理與 queue.yaml」一文,瞭解最佳做法。

建立佇列

如果應用程式是以程式輔助方式建立佇列,或是您想透過指令列建立其他佇列,請參閱本節內容。

在 Cloud Tasks 中,佇列名稱的格式為 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID。佇列名稱的 LOCATION_ID 部分對應至 Google Cloud 區域。佇列名稱的 QUEUE_ID 部分等同於「工作佇列」佇列的「name」欄位。佇列名稱是在建立佇列時產生,會根據專案、區域和您指定的 QUEUE_ID 而定。

一般來說,佇列位置 (即區域) 必須與應用程式的區域相同。但使用 europe-west 區域和 us-central 區域的應用程式是例外。在 Cloud Tasks 中,這些區域分別稱為 europe-west1us-central1

您可以在建立佇列時指定選用的佇列設定,但也可以在建立佇列後更新佇列來指定。

您不必重新建立現有佇列。請改為閱讀本指南的相關部分,遷移與現有佇列互動的程式碼。

重複使用佇列名稱

刪除佇列後,您必須等待 7 天,才能在相同專案和位置 (即區域) 中,建立具有相同佇列 ID 的佇列。

下列範例會使用 Cloud Tasks 建立兩個佇列。第一個佇列的佇列 ID 為 queue-blue,並設定為以 5/s 的速率將所有工作傳送至服務 task-modulev2 版本。第二個佇列的佇列 ID 為 queue-red,調度工作的速率為 1/s。兩者都是在專案中建立,專案 ID 為 your-project-id,位置為 us-central1。這相當於在 Task Queues 中建立佇列的 Cloud Tasks。

gcloud

gcloud CLI 會從 gcloud CLI 設定推斷專案和位置。

gcloud tasks queues create queue-blue \
--max-dispatches-per-second=5 \
--routing-override=service:task-module,version:v2
gcloud tasks queues create queue-red \
--max-dispatches-per-second=1

用戶端程式庫

import com.google.cloud.tasks.v2.AppEngineRouting;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;

public class CreateQueue {
  public static void createQueue(
      String projectId, String locationId, String queueBlueName, String queueRedName)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueBlueName = "queue-blue";
      // String queueRedName = "queue-red";

      LocationName parent = LocationName.of(projectId, locationId);

      Queue queueBlue =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueBlueName).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(5.0))
              .setAppEngineRoutingOverride(
                  AppEngineRouting.newBuilder().setVersion("v2").setService("task-module"))
              .build();

      Queue queueRed =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueRedName).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .build();

      Queue[] queues = new Queue[] {queueBlue, queueRed};
      for (Queue queue : queues) {
        Queue response = client.createQueue(parent, queue);
        System.out.println(response);
      }
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料「建立 Cloud Tasks 佇列」。

設定佇列處理速率

下表列出 Task Queues 與 Cloud Tasks 的差異欄位。

「工作佇列」欄位 Cloud Tasks 欄位 說明
rate max_dispatches_per_second 從佇列調度工作的最大速率
max_concurrent_requests max_concurrent_dispatches 可從佇列調度的並行工作數上限
bucket_size max_burst_size

Cloud Tasks 會根據 max_dispatches_per_second 的值,計算唯讀屬性 max_burst_size,限制佇列中工作的處理速度。此欄位可讓佇列享有高速率,能讓工作在排入佇列後立即開始處理,但短時間內若有許多工作排入佇列,也足以限制資源的使用量。

如果是使用 queue.xml/yaml 檔案建立或更新的 App Engine 佇列,max_burst_size 最初會等於 bucket_size。不過,如果稍後使用任何 Cloud Tasks 介面將佇列傳遞至 update 指令,系統會根據 max_dispatches_per_second 的值重設 max_burst_size,無論 max_dispatches_per_second 是否更新。

total_storage_limit Cloud Tasks 中已淘汰 Cloud Tasks 不支援設定自訂儲存空間限制

您可以在建立佇列時設定佇列處理速率,或在之後更新。以下範例使用 Cloud Tasks,在名為 queue-blue 的現有佇列中設定處理速率。如果 queue-blue 是使用 queue.yaml 檔案建立或設定,下列範例會根據 20max_dispatches_per_second 值重設 max_burst_size。這相當於在 Task Queues 中設定佇列處理速率

gcloud

gcloud tasks queues update queue-blue \
--max-dispatches-per-second=20 \
--max-concurrent-dispatches=10

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;
import com.google.cloud.tasks.v2.UpdateQueueRequest;

public class UpdateQueue {
  public static void updateQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "queue-blue";

      LocationName parent = LocationName.of(projectId, locationId);

      Queue queueBlue =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, queueId).toString())
              .setRateLimits(
                  RateLimits.newBuilder()
                      .setMaxDispatchesPerSecond(20.0)
                      .setMaxConcurrentDispatches(10))
              .build();

      UpdateQueueRequest request = UpdateQueueRequest.newBuilder().setQueue(queueBlue).build();

      Queue response = client.updateQueue(request);
      System.out.println(response);
    }
  }
}

詳情請參閱「定義速率限制」。

停用及繼續處理佇列

Cloud Tasks 使用「暫停」一詞,與 Task Queues 使用「停用」一詞的方式相同。暫停佇列後,佇列中的工作就會停止執行,直到佇列恢復運作為止。不過,您仍可繼續將工作新增至已暫停的佇列。Cloud Tasks 使用「繼續」一詞的方式與 Task Queues 相同。

以下範例會暫停佇列 ID 為 queueName 的佇列。這項功能相當於在 Task Queues 中停用佇列

gcloud

gcloud tasks queues pause 

queueName

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class PauseQueue {
  public static void pauseQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.pauseQueue(queueName);
      System.out.println("Queue Paused.");
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料中的暫停佇列

刪除佇列

刪除佇列後,必須等待 7 天才能建立名稱相同的佇列。如果無法等待 7 天,請考慮清除佇列中的所有工作,然後重新設定佇列。

以下範例會刪除佇列 ID 為 foo 的佇列。這項作業等同於在 Task Queues 中刪除佇列

gcloud

gcloud tasks queues delete 

foo

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class DeleteQueue {
  public static void deleteQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.deleteQueue(queueName);
      System.out.println("Queue Deleted.");
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料中的「刪除佇列」。

建立及管理工作

本節說明如何使用 Cloud Tasks API 建立及管理工作。

建立工作

下表列出 Task Queues 與 Cloud Tasks 的差異欄位。

「工作佇列」欄位 Cloud Tasks 欄位 說明
Cloud Tasks 新功能 app_engine_http_request 建立以 App Engine 服務為目標的要求。這些工作稱為 App Engine 工作。
method http_method 指定要求方法,例如 POST
url relative_uri 指定工作處理常式。請注意最後一個字母的差異: i 代表「統一資源識別碼」,而非 l 代表「統一資源定位器」
target app_engine_routing (選用步驟) 指定 App Engine serviceversioninstance,適用於 App Engine 工作。如未設定,系統會使用預設服務、版本和執行個體。

以下範例會建立工作,並將工作路徑導向預設 App Engine 服務上的處理常式 /worker。這相當於在工作佇列中建立工作的 Cloud Tasks。

gcloud

gcloud tasks create-app-engine-task --queue=default \
--method=POST --relative-uri=/worker?key=key

用戶端程式庫

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import java.nio.charset.Charset;

public class CreateTask {
  public static void createTask(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      String key = "key";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      // Construct the task body.
      Task taskParam =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker?key=" + key)
                      .setHttpMethod(HttpMethod.GET)
                      .build())
              .build();

      Task taskPayload =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setBody(ByteString.copyFrom(key, Charset.defaultCharset()))
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Send create task request.
      Task[] tasks = new Task[] {taskParam, taskPayload};
      for (Task task : tasks) {
        Task response = client.createTask(queueName, task);
        System.out.println(response);
      }
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料「建立 App Engine 工作」。

指定目標服務和轉送

您可以選擇是否為 App Engine 工作指定 App Engine 目標服務、版本和執行個體。根據預設,App Engine 工作會遞送到工作嘗試時的預設服務、版本和執行個體。

建立工作時,請設定工作的 app_engine_routing 屬性,為工作指定不同的 App Engine 服務、版本或執行個體。

如要將特定佇列中的所有工作轉送到相同的 App Engine 服務、版本和例項,可以在佇列中設定 app_engine_routing_override 屬性。

詳情請參閱 Cloud Tasks 參考資料「設定路由」。

將資料傳送至處理常式

與工作佇列一樣,您可以使用 Cloud Tasks,透過兩種方式將資料傳送至處理常式。您可以將資料做為相對 URI 中的查詢參數傳遞,也可以使用 HTTP 方法 POST 或 PUT,在要求內文中傳遞資料。

Cloud Tasks 使用「body」一詞的方式,與 Task Queues 使用「payload」一詞的方式相同。在 Cloud Tasks 中,預設的內文內容類型為八位元串流,而非純文字。您可以在標頭中指定主體內容類型。

以下範例會以兩種不同方式,將鍵傳遞至處理常式 /worker 。這相當於在工作佇列中將資料傳遞至處理常式的 Cloud Tasks。

主控台

gcloud tasks create-app-engine-task --queue=default --method=GET  \
--relative-uri=

/worker

?key=blue --routing=service:worker
gcloud tasks create-app-engine-task --queue=default --method=POST \
--relative-uri=

/worker

 --routing=service:worker \
--body-content="{'key': 'blue'}"

用戶端程式庫

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import java.nio.charset.Charset;

public class CreateTask {
  public static void createTask(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      String key = "key";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      // Construct the task body.
      Task taskParam =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker?key=" + key)
                      .setHttpMethod(HttpMethod.GET)
                      .build())
              .build();

      Task taskPayload =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setBody(ByteString.copyFrom(key, Charset.defaultCharset()))
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Send create task request.
      Task[] tasks = new Task[] {taskParam, taskPayload};
      for (Task task : tasks) {
        Task response = client.createTask(queueName, task);
        System.out.println(response);
      }
    }
  }
}

命名工作

您可以選擇是否指定工作名稱。如果未指定工作名稱,Cloud Tasks 會為您建構工作名稱,方法是產生工作 ID,並根據您在建立工作時指定的佇列,推斷專案和位置 (即區域)。

工作名稱的格式為 projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID/tasks/TASK_ID。工作名稱的 TASK_ID 部分等同於工作佇列工作的 name 欄位。

重複使用工作名稱

您必須等待一段時間,才能重複使用工作名稱。等待時間長度取決於工作是由 Cloud Tasks 還是 Task Queues 建立的佇列所分派。

如果是使用工作佇列建立的佇列 (包括預設佇列) 中的工作,您必須等待約 9 天,才能重新建立名稱相同的工作。如果是使用 Cloud Tasks 建立的佇列,您必須等待原始工作刪除或執行後約 1 小時,才能重新建立佇列。

以下範例會建立 TASK_ID 設為 first-try 的工作,並將其新增至預設佇列。這相當於在工作佇列中命名工作的 Cloud Tasks 做法。

gcloud

gcloud CLI 會從您的設定推斷專案和位置,藉此建構工作名稱。

gcloud tasks create-app-engine-task first-try --queue=default \
--method=GET --relative-uri=

/worker

用戶端程式庫

使用用戶端程式庫時,如要指定 TASK_ID,就必須指定完整的工作名稱。專案和位置必須與要新增工作的佇列專案和位置相符。

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.cloud.tasks.v2.TaskName;

public class CreateTaskWithName {
  public static void createTaskWithName(
      String projectId, String locationId, String queueId, String taskId) throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "default";
      // String taskId = "first-try"

      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      Task.Builder taskBuilder =
          Task.newBuilder()
              .setName(TaskName.of(projectId, locationId, queueId, taskId).toString())
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/worker")
                      .setHttpMethod(HttpMethod.GET)
                      .build());

      // Send create task request.
      Task response = client.createTask(queueName, taskBuilder.build());
      System.out.println(response);
    }
  }
}

重試失敗的工作

您可以在建立佇列時設定佇列的工作重試次數,也可以更新佇列。下表列出 Task Queues 欄位和對應的 Cloud Tasks 欄位。

「工作佇列」欄位 Cloud Tasks 欄位
task_retry_limit max_attempts
task_age_limit max_retry_duration
min_backoff_seconds min_backoff
max_backoff_seconds max_backoff
max_doublings max_doublings

工作專屬重試參數

在 Task Queues 中設定的工作專屬重試參數可在 Cloud Tasks 中運作,但您無法編輯或為新工作設定這些參數。如要變更具有工作專屬重試參數的工作重試參數,請使用具有所需重試參數的 Cloud Tasks 佇列重新建立工作。

以下範例示範幾種重試情況:

  • fooqueue 中,工作從第一次執行嘗試起,最多可以重試七次,為期最多兩天。如果兩項限制都已超過,這項工作就會永久失敗。
  • barqueue 中,App Engine 會重試工作,並以線性方式逐漸拉長每次的重試間隔時間,直到輪詢上限為止,然後會以最長時間間隔無限期一直重試 (所以要求間隔為 10 秒、20 秒、30 秒、...、190 秒、200 秒、200 秒、...)。
  • bazqueue 中,重試間隔從 10 秒開始,然後以雙倍增加三次、接著以線性方式增加,最後則按最長時間間隔無限期重試 (所以要求間隔為 10 秒、20 秒、40 秒、80 秒、160 秒、240 秒、300 秒、300 秒...)。

這相當於在工作佇列中重試工作的 Cloud Tasks 版本。

gcloud

設定指定秒數的選項時,整數後方必須加上 s (例如 200s,而非 200)。

gcloud tasks queues create fooqueue \
--max-attempts=7 \
--max-retry-duration=172800s  #2*60*60*24 seconds in 2 days
gcloud tasks queues create barqueue \
--min-backoff=10s \
--max-backoff=200s \
--max-doublings=0
gcloud tasks queues create bazqueue \
--min-backoff=10s \
--max-backoff=300s \
--max-doublings=3

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.LocationName;
import com.google.cloud.tasks.v2.Queue;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.RateLimits;
import com.google.cloud.tasks.v2.RetryConfig;
import com.google.protobuf.Duration;

public class RetryTask {
  public static void retryTask(
      String projectId, String locationId, String fooqueue, String barqueue, String bazqueue)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String fooqueue = "fooqueue";
      // String barqueue = "barqueue";
      // String bazqueue = "bazqueue";

      LocationName parent = LocationName.of(projectId, locationId);

      Duration retryDuration = Duration.newBuilder().setSeconds(2 * 60 * 60 * 24).build();
      Duration min = Duration.newBuilder().setSeconds(10).build();
      Duration max1 = Duration.newBuilder().setSeconds(200).build();
      Duration max2 = Duration.newBuilder().setSeconds(300).build();

      Queue foo =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, fooqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder().setMaxAttempts(7).setMaxRetryDuration(retryDuration))
              .build();

      Queue bar =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, barqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder()
                      .setMinBackoff(min)
                      .setMaxBackoff(max1)
                      .setMaxDoublings(0))
              .build();

      Queue baz =
          Queue.newBuilder()
              .setName(QueueName.of(projectId, locationId, bazqueue).toString())
              .setRateLimits(RateLimits.newBuilder().setMaxDispatchesPerSecond(1.0))
              .setRetryConfig(
                  RetryConfig.newBuilder()
                      .setMinBackoff(min)
                      .setMaxBackoff(max2)
                      .setMaxDoublings(3))
              .build();

      Queue[] queues = new Queue[] {foo, bar, baz};
      for (Queue queue : queues) {
        Queue response = client.createQueue(parent, queue);
        System.out.println(response);
      }
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料「設定重試參數」。

刪除佇列中的工作

刪除工作後,如果工作位於使用 queue.yaml 檔案建立的佇列中,必須等待 9 天才能建立同名工作;如果工作位於使用 Cloud Tasks 建立的佇列中,則必須等待 1 小時。

以下範例會從佇列 (佇列 ID 為 queue1) 刪除工作 (工作 ID 為 foo)。這項功能相當於在工作佇列中刪除工作

gcloud

系統會從 gcloud CLI 預設專案推斷工作專案和位置。

gcloud tasks delete foo --queue=queue1

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.TaskName;

public class DeleteTask {
  public static void deleteTask(String projectId, String locationId, String queueId, String taskId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "queue1";
      // String taskId = "foo";

      // Construct the fully qualified queue name.
      String taskName = TaskName.of(projectId, locationId, queueId, taskId).toString();

      client.deleteTask(taskName);
      System.out.println("Task Deleted.");
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料中的「從佇列中刪除工作」。

清除工作

以下範例會清除佇列 ID 為 foo 的佇列中所有工作。這項作業等同於在工作佇列中清除工作的 Cloud Tasks 版本。

gcloud

佇列專案和位置會從 gcloud CLI 預設專案推斷而來。

gcloud tasks queues purge 

foo

用戶端程式庫

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.QueueName;

public class PurgeQueue {
  public static void purgeQueue(String projectId, String locationId, String queueId)
      throws Exception {
    try (CloudTasksClient client = CloudTasksClient.create()) {
      // TODO(developer): Uncomment these lines and replace with your values.
      // String projectId = "your-project-id";
      // String locationId = "us-central1";
      // String queueId = "foo";

      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, queueId).toString();

      client.purgeQueue(queueName);
      System.out.println("Queue Purged.");
    }
  }
}

詳情請參閱 Cloud Tasks 參考資料中的「從佇列中清除所有工作」。

Java 8 Cloud Tasks 範例

以下範例是基本設定,可使用 Cloud Tasks 建立佇列,並將工作加入佇列。本文假設開發人員已建立 pom.xml 檔案,指定 Cloud Tasks 依附元件,如「匯入用戶端程式庫」一節所述。這是 Cloud Tasks 的對等項目,相當於工作佇列中的Java 8 工作佇列範例

負責建立及將工作加入佇列的檔案會建立工作,並使用 Cloud Tasks 用戶端程式庫將工作新增至預設佇列:

import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(
    name = "TaskEnqueue",
    description = "Enqueue a task targeted at endpoint '/cloudtasks/worker'",
    urlPatterns = "/cloudtasks/enqueue")
public class Enqueue extends HttpServlet {

  // TODO(developer): Replace these variables before running the sample.
  static final String projectId = "my-project-id";
  static final String locationId = "us-central1";

  // Function creates Cloud Tasks from form submissions.
  protected void doPost(HttpServletRequest request, HttpServletResponse response)
      throws ServletException, IOException {
    String key = request.getParameter("key");

    try (CloudTasksClient client = CloudTasksClient.create()) {
      // Construct the fully qualified queue name.
      String queueName = QueueName.of(projectId, locationId, "default").toString();

      // Construct the task body.
      Task task =
          Task.newBuilder()
              .setAppEngineHttpRequest(
                  AppEngineHttpRequest.newBuilder()
                      .setRelativeUri("/cloudtasks/worker?key=" + key)
                      .setHttpMethod(HttpMethod.POST)
                      .build())
              .build();

      // Add the task to the default queue.
      Task taskResponse = client.createTask(queueName, task);
      System.out.println("Task created: " + taskResponse.getName());
    }

    response.sendRedirect("/");
  }
}

定義工作站的檔案會處理工作:

import java.io.IOException;
import java.util.logging.Logger;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(
    name = "TaskWorker",
    description = "Endpoint to process Cloud Task requests",
    urlPatterns = "/cloudtasks/worker"
)
public class Worker extends HttpServlet {

  private static final Logger log = Logger.getLogger(Worker.class.getName());

  // Worker function to process POST requests from Cloud Tasks targeted at the
  // '/cloudtasks/worker' endpoint.
  protected void doPost(HttpServletRequest request, HttpServletResponse response)
      throws ServletException, IOException {
    String key = request.getParameter("key");
    log.info("Worker is processing " + key);
  }
}

後續步驟