创建 App Engine 任务处理程序

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

本页面演示了如何创建 App Engine 任务处理程序(即处理 App Engine 任务的工作器代码)。Cloud Tasks 队列会将 HTTP 请求发送到您的任务处理程序。成功处理任务后,处理程序必须将 200299 之间的 HTTP 状态代码发送回队列。任何其他代码都表示任务失败,队列将重试该任务。

App Engine 任务队列请求从 IP 地址 0.1.0.2 发送。另请参阅发送到 App Engine 环境的请求的 IP 范围

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const bodyParser = require('body-parser');
const express = require('express');

const app = express();
app.enable('trust proxy');

// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(bodyParser.raw({type: 'application/octet-stream'}));

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log('Received task with payload: %s', req.body);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(process.env.PORT || 8080, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

超时

App Engine 任务具有特定的超时时间,具体取决于运行服务的服务的扩缩类型

对于在标准环境中运行的工作器:

  • 自动扩缩:任务处理必须在 10 分钟内完成。
  • 手动和基本扩缩:请求最长可运行 24 小时。

对于在柔性环境中运行的工作器服务:所有类型的超时时间均为 60 分钟。

如果您的处理程序错过了截止时间,则队列会假定任务失败并重试。

读取 App Engine 任务请求标头

Cloud Tasks 队列发送到 App Engine 处理程序的请求具有特殊标头,其中包含您的处理程序可能希望使用的任务特定信息。

这些标头在内部设置。如果其中任何标头包含在应用的外部用户请求中,就会被内部标头取代(来自已登录应用的管理员的请求除外,管理员可以出于测试目的设置标头)。

App Engine 任务请求始终包含以下标头:

标头 说明
X-AppEngine-QueueName 队列名称。
X-AppEngine-TaskName 任务的“短”名称,或者系统为任务生成的唯一 ID(如果在创建时未指定任务名称)。这是完整任务名称中 'my-task-id' 部分的值,即 task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id。
X-AppEngine-TaskRetryCount 此任务已经重试的次数。对于第一次尝试,该值为 0。此数字包括由于缺少可用实例导致任务失败而从未到达执行阶段的尝试次数。
X-AppEngine-TaskExecutionCount 任务从处理程序收到响应的总次数。由于 Cloud Tasks 在收到成功响应后会删除任务,因此所有先前的处理程序响应都是失败的。此数字不包括由于缺少可用实例而导致的失败次数。
X-AppEngine-TaskETA 任务的计划运行时间,以 1970 年 1 月 1 日以来的秒数指定。

如果您的请求处理程序找到上述任何标头,就可以确信这是 Cloud Tasks 请求。

此外,来自 Cloud Tasks 的请求可能包含以下标头:

标头 说明
X-AppEngine-TaskPreviousResponse 来自上一次重试的 HTTP 响应代码。
X-AppEngine-TaskRetryReason 重试任务的原因。
X-AppEngine-FailFast 表示任务在现有实例不可用时快速失败。

目标路由

在 App Engine 任务中,队列和任务处理程序在同一 Cloud 项目中运行。流量在传输过程中会加密,从不会离开 Google 数据中心。由于此流量通过 Google 内部的通信机制传输,因此您无法明确设置协议(例如 HTTP 或 HTTPS)。但是,对处理程序的请求将显示为使用了 HTTP 协议。

任务可分派给安全的任务处理程序、不安全的任务处理程序,以及受支持的运行时中受 login: admin 限制的 URI。由于任务不会以任何用户身份运行,因此它们无法分派给受 login: required 限制的 URI。任务调度也不遵循重定向。

后续步骤