创建 App Engine 任务处理程序

本页面演示了如何创建 App Engine 任务处理程序、 处理 App Engine 任务的工作器代码。Cloud Tasks 队列将 HTTP 请求发送到您的任务处理程序。成功处理任务后,处理程序必须将 200299 之间的 HTTP 状态代码发送回队列。任何其他代码都表示任务失败,队列将重试该任务。

App Engine 任务队列请求从 IP 地址 0.1.0.2 发送。另请参阅 发送到 App Engine 环境的请求的 IP 范围

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Python

from flask import Flask, render_template, request

app = Flask(__name__)


@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

超时

App Engine 任务具有特定的超时, 伸缩类型 运行它们的服务的 Pod 的名称

对于在标准环境中运行的工作器:

  • 自动扩缩:任务处理必须在 10 分钟内完成。
  • 手动和基本扩缩:请求最长可运行 24 小时。

对于在柔性环境中运行的工作器服务:所有类型的服务都有 60 分钟的时间 超时。

如果处理程序错过截止时间,则队列会假定任务失败, 重试。

读取 App Engine 任务请求标头

Cloud Tasks 队列向 App Engine 处理程序发送的请求具有特殊标头,其中包含处理程序可能想要使用的任务特有的信息。

这些标头在内部设置。如果其中一些出现在应用的外部用户请求中,将会被内部标头替换(由允许设置测试标头的应用管理员登录并发出的请求除外)。

App Engine 任务请求始终包含以下标头:

标头 说明
X-AppEngine-QueueName 队列名称。
X-AppEngine-TaskName 任务的“短”名称,或者系统为任务生成的唯一 ID(如果在创建时未指定任务名称)。这是整个任务名称中的 my-task-id 值,例如 task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id
X-AppEngine-TaskRetryCount 任务已经重试的次数。对于第一次尝试,该值为 0。此数字包括由于缺少可用实例导致任务失败而从未到达执行阶段的尝试次数。
X-AppEngine-TaskExecutionCount 任务已执行并从处理程序收到响应的次数。由于 Cloud Tasks 在收到成功响应后会删除任务,因此所有先前的处理程序响应都是失败的。此数字不包括由于缺少可用实例而导致的失败次数。请注意,如果在尝试执行之前更新 X-AppEngine-TaskExecutionCount,则它可能等于 X-AppEngine-TaskRetryCount
X-AppEngine-TaskETA 任务的计划运行时间,以 1970 年 1 月 1 日以来的秒数指定。

如果您的请求处理程序找到了之前列出的任何标头,它可以 假设请求是一个 Cloud Tasks 请求。

此外,来自 Cloud Tasks 的请求可能包含以下标头:

标头 说明
X-AppEngine-TaskPreviousResponse 来自上一次重试的 HTTP 响应代码。
X-AppEngine-TaskRetryReason 重试任务的原因。
X-AppEngine-FailFast 表示任务在现有实例不可用时快速失败。

目标路由

在 App Engine 任务中,队列和任务处理程序在同一个 Google Cloud 项目中运行。流量在传输过程中被加密,不会离开 Google 数据中心。您无法明确设置协议(例如,HTTP 或 HTTPS)。不过,对处理程序的请求显示为 都使用了 HTTP 协议

任务可被分派到安全的任务处理程序、不安全的任务处理程序,以及(在支持的运行时环境中)受 login: admin 限制的 URI。由于并非任何用户身份都可以运行任务,因此无法将其分派到受 login: required 限制的 URI。任务调度也不遵循重定向。

后续步骤