执行异步任务

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

您可以使用 Cloud Tasks 安全地将任务加入队列,由 Cloud Run 服务对其进行异步处理。典型用例包括:

  • 通过意外生产突发事件保留请求
  • 通过延迟工作来缓解流量高峰不对用户产生影响
  • 通过将缓慢的后台操作委派给其他服务(如数据库更新或批处理)来加快用户响应速度
  • 将调用率限制为支持数据库和第三方 API 等服务

本页介绍了如何将通过 HTTPS 协议安全推送的任务加入到私有 Cloud Run 服务队列。其中介绍了私有 Cloud Run 服务所需的行为、所需服务帐号权限、任务队列创建和任务创建。

前期准备

在您使用的项目上启用 Cloud Tasks API

部署 Cloud Run 服务以处理任务

要部署接受发送到任务队列的任务的服务,其部署服务的方式与任何其他 Cloud Run 服务相同。Cloud Run 服务必须返回 HTTP 200 代码,以在任务处理完成后确认任务已成功执行。

Cloud Tasks 会将任务作为 HTTPS 请求推送到此 Cloud Run 服务。

必须在配置的超时时间内获得对 Cloud Tasks 的响应结果。对于运行时间超过 Cloud Tasks 超时上限的工作负载,请考虑使用 Cloud Run 作业

使用 Terraform 进行部署

要创建服务,请将以下内容添加到 .tf 文件:

resource "google_cloud_run_service" "default" {
    name     = "cloud-run-service-name"
    location = "us-central1"
    provider = google-beta
    template {
      spec {
            containers {
                image = "gcr.io/cloudrun/hello"
            }
      }
    }
    traffic {
      percent         = 100
      latest_revision = true
    }
}

如需在 Google Cloud 项目中应用 Terraform 配置,请完成以下步骤:

  1. 启动 Cloud Shell
  2. 设置要应用 Terraform 配置的 Google Cloud 项目:
    export GOOGLE_CLOUD_PROJECT=PROJECT_ID
    
  3. 创建一个目录,并在该目录中打开一个新文件。文件名必须具有 .tf 扩展名,例如 main.tf
    mkdir DIRECTORY && cd DIRECTORY && nano main.tf
    
  4. 将示例复制到 main.tf
  5. 查看和修改要应用到您的环境的示例参数。
  6. 依次按 Ctrl-xy 保存更改。
  7. 初始化 Terraform:
    terraform init
  8. 查看配置并验证 Terraform 将创建或更新的资源是否符合您的预期:
    terraform plan

    根据需要更正配置。

  9. 通过运行以下命令并在提示符处输入 yes 来应用 Terraform 配置:
    terraform apply

    等待 Terraform 显示“应用完成!”消息。

  10. 打开您的 Google Cloud 项目以查看结果。在 Google Cloud 控制台中,在界面中找到资源,以确保 Terraform 已创建或更新它们。

创建任务队列

命令行

要创建任务队列,请使用以下命令:

gcloud tasks queues create QUEUE-ID

QUEUE-ID 替换为您要为任务队列指定的名称:该名称在您的项目中必须是独一无二的。如果系统提示您在项目中创建 App Engine 应用,请回复 y 来创建它。Cloud Tasks 将此用于队列:请确保您选择的位置与 Cloud Run 服务所用的位置相同。

默认任务队列配置在大多数情况下都应该有效。不过,您可以根据需要选择设置不同的速率限制重试参数

Terraform

要创建任务队列,请将以下内容添加到 .tf 文件:

resource "google_cloud_tasks_queue" "default" {
  name = "cloud-tasks-queue-name"
  location = "us-central1"
  provider = google-beta
}

输入 terraform apply 以应用更改。

创建服务帐号以与任务相关联

您必须创建一个与加入队列的任务相关联的服务帐号。此服务帐号必须具有 Cloud Run Invoker IAM 角色才能允许任务队列将任务推送到 Cloud Run 服务。.

控制台

  1. 在 Google Cloud 控制台中,转到服务帐号页面。

    转到“服务帐号”

  2. 选择一个项目。

  3. 输入要在 Google Cloud 控制台中显示的服务帐号名称。

    Google Cloud 控制台会根据此名称生成服务帐号 ID。如有必要,请修改 ID。此 ID 创建后便无法更改。

  4. 可选:输入服务帐号的说明。

  5. 点击创建

  6. 点击选择角色字段。

  7. 所有角色下,选择 Cloud Run > Cloud Run Invoker

  8. 点击完成

命令行

  1. 创建服务帐号:

    gcloud iam service-accounts create SERVICE_ACCOUNT_NAME \
       --display-name "DISPLAYED_SERVICE_ACCOUNT_NAME"

    注意替换如下内容:

    • SERVICE_ACCOUNT_NAME 替换为 Google Cloud 项目中唯一的小写名称,例如 my-invoker-service-account-name
    • DISPLAYED_SERVICE_ACCOUNT_NAME 替换为您要在控制台等界面中为此服务帐号显示的名称,例如 My Invoker Service Account
  2. 对于 Cloud Run,请向您的服务帐号授予调用服务的权限:

    gcloud run services add-iam-policy-binding SERVICE \
       --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
       --role=roles/run.invoker

    替换

    • SERVICE 替换为您希望 Cloud Tasks 调用的服务的名称。
    • SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。
    • PROJECT_ID 替换为 Google Cloud 项目 ID。

Terraform

请将以下内容添加到 .tf 文件:

创建服务帐号:

resource "google_service_account" "sa" {
  account_id   = "cloud-run-task-invoker"
  display_name = "Cloud Run Task Invoker"
  provider = google-beta
}

对于 Cloud Run,请向您的服务帐号授予调用服务的权限:

resource "google_cloud_run_service_iam_binding" "binding" {
  location = google_cloud_run_service.default.location
  service = google_cloud_run_service.default.name
  role = "roles/run.invoker"
  members = ["serviceAccount:${google_service_account.sa.email}"]
  provider = google-beta
  project = google_cloud_run_service.default.project
}

输入 terraform apply 以应用更改。

使用身份验证令牌创建 HTTP 任务

创建要发送到任务队列的任务时,请指定项目、位置、队列名称、之前创建并且要与任务相关联的服务帐号的电子邮件地址、将运行该任务的私有 Cloud Run 服务的网址,以及您需要发送的任何其他数据。 您可以选择硬编码这些值,但项目 ID、位置和服务帐号电子邮件地址等值可以从Cloud Run 元数据服务器动态检索

如需详细了解任务请求正文,请参阅 Cloud Tasks API 文档。请注意,包含数据负载的请求必须使用 HTTP PUTPOST 方法。

将任务加入队列的代码必须具有执行此操作所需的 IAM 权限,例如 Cloud Tasks Enqueuer 角色。如果您在 Cloud Run 上使用默认服务帐号,您的代码将拥有必要的 IAM 权限。

以下示例创建任务请求,其中还包括标头令牌的创建。示例中使用了 OIDC 令牌。要使用 OAuth 令牌,请在构造请求时将 OIDC 参数替换为相应语言的 OAuth 参数。

Python

"""Create a task for a given queue with an arbitrary payload."""

from google.cloud import tasks_v2

# Create a client.
client = tasks_v2.CloudTasksClient()

# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# queue = 'my-queue'
# location = 'us-central1'
# url = 'https://example.com/task_handler?param=value'
# audience = 'https://example.com/task_handler'
# service_account_email = 'service-account@my-project-id.iam.gserviceaccount.com';
# payload = 'hello'

# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)

# Construct the request body.
task = {
    "http_request": {  # Specify the type of request.
        "http_method": tasks_v2.HttpMethod.POST,
        "url": url,  # The full url path that the task will be sent to.
        "oidc_token": {
            "service_account_email": service_account_email,
            "audience": audience,
        },
    }
}

if payload is not None:
    # The API expects a payload of type bytes.
    converted_payload = payload.encode()

    # Add the payload to the request.
    task["http_request"]["body"] = converted_payload

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

print("Created task {}".format(response.name))
return response

请注意 requirements.txt 文件:

google-cloud-tasks==2.10.2

Java

import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.HttpRequest;
import com.google.cloud.tasks.v2.OidcToken;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.charset.Charset;

public class CreateHttpTaskWithToken {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "my-project-id";
    String locationId = "us-central1";
    String queueId = "my-queue";
    String serviceAccountEmail =
        "java-docs-samples-testing@java-docs-samples-testing.iam.gserviceaccount.com";
    createTask(projectId, locationId, queueId, serviceAccountEmail);
  }

  // Create a task with a HTTP target and authorization token using the Cloud Tasks client.
  public static void createTask(
      String projectId, String locationId, String queueId, String serviceAccountEmail)
      throws IOException {

    // Instantiates a client.
    try (CloudTasksClient client = CloudTasksClient.create()) {
      String url =
          "https://example.com/taskhandler"; // The full url path that the request will be sent to
      String payload = "Hello, World!"; // The task HTTP request body

      // Construct the fully qualified queue name.
      String queuePath = QueueName.of(projectId, locationId, queueId).toString();

      // Add your service account email to construct the OIDC token.
      // in order to add an authentication header to the request.
      OidcToken.Builder oidcTokenBuilder =
          OidcToken.newBuilder().setServiceAccountEmail(serviceAccountEmail);

      // Construct the task body.
      Task.Builder taskBuilder =
          Task.newBuilder()
              .setHttpRequest(
                  HttpRequest.newBuilder()
                      .setBody(ByteString.copyFrom(payload, Charset.defaultCharset()))
                      .setHttpMethod(HttpMethod.POST)
                      .setUrl(url)
                      .setOidcToken(oidcTokenBuilder)
                      .build());

      // Send create task request.
      Task task = client.createTask(queuePath, taskBuilder.build());
      System.out.println("Task created: " + task.getName());
    }
  }
}

请注意 pom.xml 文件:

<?xml version='1.0' encoding='UTF-8'?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.google.cloud</groupId>
  <artifactId>cloudtasks-snippets</artifactId>
  <packaging>jar</packaging>
  <name>Google Cloud Tasks Snippets</name>
  <url>https://github.com/googleapis/java-tasks</url>

  <!--
    The parent pom defines common style checks and testing strategies for our samples.
    Removing or replacing it should not affect the execution of the samples in anyway.
  -->
  <parent>
    <groupId>com.google.cloud.samples</groupId>
    <artifactId>shared-configuration</artifactId>
    <version>1.2.0</version>
  </parent>

  <properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>libraries-bom</artifactId>
        <version>26.1.1</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-tasks</artifactId>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.google.truth</groupId>
      <artifactId>truth</artifactId>
      <version>1.1.3</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

Go

import (
	"context"
	"fmt"

	cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
	taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
)

// createHTTPTaskWithToken constructs a task with a authorization token
// and HTTP target then adds it to a Queue.
func createHTTPTaskWithToken(projectID, locationID, queueID, url, email, message string) (*taskspb.Task, error) {
	// Create a new Cloud Tasks client instance.
	// See https://godoc.org/cloud.google.com/go/cloudtasks/apiv2
	ctx := context.Background()
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("NewClient: %v", err)
	}
	defer client.Close()

	// Build the Task queue path.
	queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", projectID, locationID, queueID)

	// Build the Task payload.
	// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#CreateTaskRequest
	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#HttpRequest
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        url,
					AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
						OidcToken: &taskspb.OidcToken{
							ServiceAccountEmail: email,
						},
					},
				},
			},
		},
	}

	// Add a payload message if one is present.
	req.Task.GetHttpRequest().Body = []byte(message)

	createdTask, err := client.CreateTask(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cloudtasks.CreateTask: %v", err)
	}

	return createdTask, nil
}

Node.js

// Imports the Google Cloud Tasks library.
const {CloudTasksClient} = require('@google-cloud/tasks');

// Instantiates a client.
const client = new CloudTasksClient();

async function createHttpTaskWithToken() {
  // TODO(developer): Uncomment these lines and replace with your values.
  // const project = 'my-project-id';
  // const queue = 'my-queue';
  // const location = 'us-central1';
  // const url = 'https://example.com/taskhandler';
  // const serviceAccountEmail = 'client@<project-id>.iam.gserviceaccount.com';
  // const payload = 'Hello, World!';

  // Construct the fully qualified queue name.
  const parent = client.queuePath(project, location, queue);

  const task = {
    httpRequest: {
      httpMethod: 'POST',
      url,
      oidcToken: {
        serviceAccountEmail,
      },
    },
  };

  if (payload) {
    task.httpRequest.body = Buffer.from(payload).toString('base64');
  }

  console.log('Sending task:');
  console.log(task);
  // Send create task request.
  const request = {parent: parent, task: task};
  const [response] = await client.createTask(request);
  const name = response.name;
  console.log(`Created task ${name}`);
}
createHttpTaskWithToken();

请注意 package.json 文件:

{
  "name": "appengine-cloudtasks",
  "description": "Google App Engine Cloud Tasks example.",
  "license": "Apache-2.0",
  "author": "Google Inc.",
  "private": true,
  "engines": {
    "node": ">=12.0.0"
  },
  "files": [
    "*.js"
  ],
  "scripts": {
    "test": "mocha",
    "start": "node server.js"
  },
  "dependencies": {
    "@google-cloud/tasks": "^3.0.3",
    "body-parser": "^1.18.3",
    "express": "^4.16.3"
  },
  "devDependencies": {
    "chai": "^4.2.0",
    "mocha": "^8.0.0",
    "uuid": "^9.0.0"
  }
}

后续步骤