创建 App Engine 任务处理程序

本页面演示了如何创建 App Engine 任务处理程序,即处理 App Engine 任务的工作器代码。Cloud Tasks 队列会将 HTTP 请求发送到您的任务处理程序。成功处理任务后,处理程序必须将 200299 之间的 HTTP 状态代码发送回队列。任何其他代码都表示任务失败,队列将重试该任务。

App Engine 任务队列请求从 IP 地址 0.1.0.2 发送。另请参阅发送到 App Engine 环境的请求的 IP 范围

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

超时

App Engine 任务有特定的超时限制,具体取决于运行它们的服务的伸缩类型

对于在标准环境中运行的工作器:

  • 自动扩缩:任务处理必须在 10 分钟内完成。
  • 手动和基本扩缩:请求最长可运行 24 小时。

对于在柔性环境中运行的工作器服务:所有类型的工作器都有 60 分钟的超时时间。

如果处理程序错过截止时间,则队列会假定任务失败并重试。

读取 App Engine 任务请求标头

Cloud Tasks 队列发送到 App Engine 处理程序的请求具有特殊标头,其中包含处理程序可能想要使用的任务特定信息。

这些标头在内部设置。如果任何此类标头出现在您应用的外部用户请求中,它们会被内部标头替换,但来自已登录应用管理员的请求除外,他们可以出于测试目的设置标头。

App Engine 任务请求始终包含以下标头:

标头 说明
X-AppEngine-QueueName 队列名称。
X-AppEngine-TaskName 任务的“短”名称,或者系统为任务生成的唯一 ID(如果在创建时未指定任务名称)。这是完整任务名称中的 my-task-id 值;例如 task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id
X-AppEngine-TaskRetryCount 任务的重试次数。对于第一次尝试,该值为 0。此数字包括由于缺少可用实例导致任务失败而从未到达执行阶段的尝试次数。
X-AppEngine-TaskExecutionCount 任务执行和接收来自处理程序的响应的次数。由于 Cloud Tasks 会在收到成功响应后删除任务,因此之前的所有处理程序响应都是失败的。此数字不包括由于缺少可用实例而导致的失败次数。请注意,如果 X-AppEngine-TaskExecutionCount 在尝试执行之前进行更新,则可以等于 X-AppEngine-TaskRetryCount
X-AppEngine-TaskETA 任务的计划运行时间,以 1970 年 1 月 1 日以来的秒数指定。

如果您的请求处理程序找到上述任何标头,就可以确信这是 Cloud Tasks 请求。

此外,来自 Cloud Tasks 的请求可能包含以下标头:

标头 说明
X-AppEngine-TaskPreviousResponse 来自上一次重试的 HTTP 响应代码。
X-AppEngine-TaskRetryReason 重试任务的原因。
X-AppEngine-FailFast 表示任务在现有实例不可用时快速失败。

目标路由

在 App Engine 任务中,队列和任务处理程序在同一个 Google Cloud 项目中运行。流量在传输过程中会加密,不会离开 Google 数据中心。您无法明确设置协议(例如 HTTP 或 HTTPS)。但是,对处理程序的请求表现为使用了 HTTP 协议。

任务可分派给安全的任务处理程序和不安全的任务处理程序,在受支持的运行时中,也可将任务分派给受 login: admin 限制的 URI。由于任务并非以任何用户身份运行,因此无法将任务分派给受 login: required 限制的 URI。任务调度也不遵循重定向。

后续步骤