本页面演示了如何以编程方式创建 App Engine 任务
并将其放入 Cloud Tasks 队列中。
使用此过程,您可以明确指定
处理该任务,并可选择将特定于任务的数据传递到
处理程序。Cloud Tasks 服务会将任务请求转发给处理程序,但此工作器位于 App Engine 中。因此所有队列
目标 App Engine 处理程序必须具有
App Engine 应用。
这些处理程序必须在
App Engine 应用运行。此区域还用作 Cloud Tasks 请求的 LOCATION_ID
参数。如需了解详情,请参阅使用 App Engine 目标的 Cloud Tasks 队列。
您还可以使用 配置 例如,安排将来应该执行该任务的时间,或 限制您希望在任务失败时重试的次数。如果您选择为任务指定名称,则 Cloud Tasks 可以使用该名称来确保任务重复信息删除,但必要的处理可能会增加延迟时间。
App Engine 防火墙规则
在 App Engine 标准环境中,App Engine 防火墙可以允许某些内部流量绕过防火墙。这意味着,如果您将 default
规则设置为 deny
,从特定服务发往 App Engine 标准环境的请求不会被阻止。这些是应用自身中请求的所有类型的流量
配置数据,也可以从同一应用发送。绕过防火墙规则的请求
还会将 Cloud Tasks 中的 App Engine 任务(包括
App Engine 任务队列)。
如需允许传入请求,请使用以下 App Engine 的 IP 范围 Cloud Tasks 中的任务(包括 App Engine 任务队列):
发送到 App Engine 标准环境的请求的 IP 地址范围:
0.1.0.2/32
(绕过 如果设置为拒绝,则默认防火墙规则)发送到 App Engine 柔性环境的请求的 IP 范围:
0.1.0.2/32
使用客户端库创建任务
你可以 HTTP 请求, 您可以随意构建。但是,如以下示例所示,使用客户端库可以帮助您管理与服务器进行低层级通信的细节,包括使用 Google 进行身份验证。如需向队列添加任务,请参阅将任务添加到 Cloud Tasks 队列
C#
using Google.Cloud.Tasks.V2;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using System;
class CreateAppEngineTask
{
public string CreateTask(
// TODO<developer>: call this method by passing correct values for
// the following parameters or change the parameters' default values.
string projectId = "YOUR-PROJECT-ID",
string location = "us-central1",
string queue = "my-queue",
string payload = "Hello World!",
int inSeconds = 0)
{
CloudTasksClient client = CloudTasksClient.Create();
QueueName parent = new QueueName(projectId, location, queue);
var response = client.CreateTask(new CreateTaskRequest
{
Parent = parent.ToString(),
Task = new Task
{
AppEngineHttpRequest = new AppEngineHttpRequest
{
HttpMethod = HttpMethod.Post,
RelativeUri = "/log_payload",
Body = ByteString.CopyFromUtf8(payload)
},
ScheduleTime = Timestamp.FromDateTime(
DateTime.UtcNow.AddSeconds(inSeconds))
}
});
Console.WriteLine($"Created Task {response.Name}");
return response.Name;
}
}
Go
// Command create_task constructs and adds a task to an App Engine Queue.
package main
import (
"context"
"fmt"
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
)
// createTask creates a new task in your App Engine queue.
func createTask(projectID, locationID, queueID, message string) (*taskspb.Task, error) {
// Create a new Cloud Tasks client instance.
// See https://godoc.org/cloud.google.com/go/cloudtasks/apiv2
ctx := context.Background()
client, err := cloudtasks.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("NewClient: %w", err)
}
defer client.Close()
// Build the Task queue path.
queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", projectID, locationID, queueID)
// Build the Task payload.
// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#CreateTaskRequest
req := &taskspb.CreateTaskRequest{
Parent: queuePath,
Task: &taskspb.Task{
// https://godoc.org/google.golang.org/genproto/googleapis/cloud/tasks/v2#AppEngineHttpRequest
MessageType: &taskspb.Task_AppEngineHttpRequest{
AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
RelativeUri: "/task_handler",
},
},
},
}
// Add a payload message if one is present.
req.Task.GetAppEngineHttpRequest().Body = []byte(message)
createdTask, err := client.CreateTask(ctx, req)
if err != nil {
return nil, fmt.Errorf("cloudtasks.CreateTask: %w", err)
}
return createdTask, nil
}
Java
import com.google.cloud.tasks.v2.AppEngineHttpRequest;
import com.google.cloud.tasks.v2.CloudTasksClient;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.cloud.tasks.v2.QueueName;
import com.google.cloud.tasks.v2.Task;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Clock;
import java.time.Instant;
public class CreateTask {
public static void main(String[] args) throws IOException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "my-project-id";
String queue = "my-appengine-queue";
String location = "us-central1";
String payload = "hello";
int seconds = 0; // Scheduled delay for the task in seconds
createTask(projectId, queue, location, payload, seconds);
}
// This is an example snippet for showing best practices.
public static void createTask(
String projectId, String queueName, String location, String payload, int seconds)
throws IOException {
// Instantiates a client.
try (CloudTasksClient client = CloudTasksClient.create()) {
// Construct the fully qualified queue name.
String queuePath = QueueName.of(projectId, location, queueName).toString();
// Construct the task body.
Task.Builder taskBuilder =
Task.newBuilder()
.setAppEngineHttpRequest(
AppEngineHttpRequest.newBuilder()
.setBody(ByteString.copyFrom(payload, Charset.defaultCharset()))
.setRelativeUri("/tasks/create")
.setHttpMethod(HttpMethod.POST)
.build());
// Add the scheduled time to the request.
taskBuilder.setScheduleTime(
Timestamp.newBuilder()
.setSeconds(Instant.now(Clock.systemUTC()).plusSeconds(seconds).getEpochSecond()));
// Send create task request.
Task task = client.createTask(queuePath, taskBuilder.build());
System.out.println("Task created: " + task.getName());
}
}
}
请注意 pom.xml
文件:
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<groupId>com.example.appengine</groupId>
<artifactId>appengine-tasks-j11</artifactId>
<!--
The parent pom defines common style checks and testing strategies for our samples.
Removing or replacing it should not affect the execution of the samples in anyway.
-->
<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.2.0</version>
<relativePath></relativePath>
</parent>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
</properties>
<!-- Using libraries-bom to manage versions.
See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google-Cloud-Platform-Libraries-BOM -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.32.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-tasks</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.4.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>tasks</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>3.4.0</version>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<mainClass>com.example.task.CreateTask</mainClass>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</build>
</project>
Node.js
// Imports the Google Cloud Tasks library.
const {CloudTasksClient} = require('@google-cloud/tasks');
// Instantiates a client.
const client = new CloudTasksClient();
async function createTask() {
// TODO(developer): Uncomment these lines and replace with your values.
// const project = 'my-project-id';
// const queue = 'my-appengine-queue';
// const location = 'us-central1';
// const payload = 'Hello, World!';
// Construct the fully qualified queue name.
const parent = client.queuePath(project, location, queue);
const task = {
appEngineHttpRequest: {
headers: {
'Content-Type': 'text/plain', // Set content type to ensure compatibility your application's request parsing
},
httpMethod: 'POST',
relativeUri: '/log_payload',
},
};
if (payload) {
task.appEngineHttpRequest.body = Buffer.from(payload).toString('base64');
}
if (inSeconds) {
// The time when the task is scheduled to be attempted.
task.scheduleTime = {
seconds: inSeconds + Date.now() / 1000,
};
}
console.log('Sending task:');
console.log(task);
// Send create task request.
const request = {parent: parent, task: task};
const [response] = await client.createTask(request);
const name = response.name;
console.log(`Created task ${name}`);
}
createTask();
请注意 package.json
文件:
{
"name": "appengine-cloudtasks",
"description": "Google App Engine Cloud Tasks example.",
"license": "Apache-2.0",
"author": "Google Inc.",
"private": true,
"engines": {
"node": ">=16.0.0"
},
"files": [
"*.js"
],
"scripts": {
"test": "c8 mocha -p -j 2 --timeout 30000",
"start": "node server.js"
},
"dependencies": {
"@google-cloud/tasks": "^5.0.0",
"express": "^4.16.3"
},
"devDependencies": {
"c8": "^10.0.0",
"chai": "^4.5.0",
"mocha": "^10.0.0",
"uuid": "^10.0.0"
}
}
PHP
use Google\Cloud\Tasks\V2\AppEngineHttpRequest;
use Google\Cloud\Tasks\V2\CloudTasksClient;
use Google\Cloud\Tasks\V2\HttpMethod;
use Google\Cloud\Tasks\V2\Task;
/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $locationId = 'The Location ID';
// $queueId = 'The Cloud Tasks App Engine Queue ID';
// $payload = 'The payload your task should carry to the task handler. Optional';
// Instantiate the client and queue name.
$client = new CloudTasksClient();
$queueName = $client->queueName($projectId, $locationId, $queueId);
// Create an App Engine Http Request Object.
$httpRequest = new AppEngineHttpRequest();
// The path of the HTTP request to the App Engine service.
$httpRequest->setRelativeUri('/task_handler');
// POST is the default HTTP method, but any HTTP method can be used.
$httpRequest->setHttpMethod(HttpMethod::POST);
// Setting a body value is only compatible with HTTP POST and PUT requests.
if (isset($payload)) {
$httpRequest->setBody($payload);
}
// Create a Cloud Task object.
$task = new Task();
$task->setAppEngineHttpRequest($httpRequest);
// Send request and print the task name.
$response = $client->createTask($queueName, $task);
printf('Created task %s' . PHP_EOL, $response->getName());
请注意 composer.json
文件:
{
"require": {
"google/cloud-tasks": "^1.4.0"
}
}
Python
"""Create a task for a given queue with an arbitrary payload."""
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import datetime
import json
# Create a client.
client = tasks_v2.CloudTasksClient()
# TODO(developer): Uncomment these lines and replace with your values.
# project = 'my-project-id'
# queue = 'my-appengine-queue'
# location = 'us-central1'
# payload = 'hello' or {'param': 'value'} for application/json
# in_seconds = None
# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)
# Construct the request body.
task = {
"app_engine_http_request": { # Specify the type of request.
"http_method": tasks_v2.HttpMethod.POST,
"relative_uri": "/example_task_handler",
}
}
if payload is not None:
if isinstance(payload, dict):
# Convert dict to JSON string
payload = json.dumps(payload)
# specify http content-type to application/json
task["app_engine_http_request"]["headers"] = {
"Content-type": "application/json"
}
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task["app_engine_http_request"]["body"] = converted_payload
if in_seconds is not None:
# Convert "seconds from now" into an rfc3339 datetime string.
d = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(
seconds=in_seconds
)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task["schedule_time"] = timestamp
# Use the client to build and send the task.
response = client.create_task(parent=parent, task=task)
print(f"Created task {response.name}")
return response
请注意 requirements.txt
文件:
Flask==3.0.3; python_version > '3.6'
Flask==2.0.3; python_version < '3.7'
gunicorn==22.0.0
google-cloud-tasks==2.13.1
Werkzeug==3.0.3
Ruby
require "google/cloud/tasks"
# Create an App Engine Task
#
# @param [String] project_id Your Google Cloud Project ID.
# @param [String] location_id Your Google Cloud Project Location ID.
# @param [String] queue_id Your Google Cloud App Engine Queue ID.
# @param [String] payload The request body of your task.
# @param [Integer] seconds The delay, in seconds, to process your task.
def create_task project_id, location_id, queue_id, payload: nil, seconds: nil
# Instantiates a client.
client = Google::Cloud::Tasks.cloud_tasks
# Construct the fully qualified queue name.
parent = client.queue_path project: project_id, location: location_id, queue: queue_id
# Construct task.
task = {
app_engine_http_request: {
http_method: "POST",
relative_uri: "/log_payload"
}
}
# Add payload to task body.
if payload
task[:app_engine_http_request][:body] = payload
end
# Add scheduled time to task.
if seconds
timestamp = Google::Protobuf::Timestamp.new
timestamp.seconds = Time.now.to_i + seconds.to_i
task[:schedule_time] = timestamp
end
# Send create task request.
puts "Sending task #{task}"
response = client.create_task parent: parent, task: task
puts "Created task #{response.name}" if response.name
end
后续步骤
- 了解如何创建 App Engine 任务处理程序。
- 如需详细了解任务,请参阅 RPC API 参考文档。
- 请参阅 REST API 参考,详细了解任务。