建立 App Engine 工作處理常式

本頁面說明如何建立「工作處理常式」;工作處理常式是指能夠處理 App Engine 工作的工作站程式碼。App Engine 佇列會將 HTTP 要求傳送到工作處理常式,也就是您提供的程式碼。在順利完成處理作業後,處理常式必須傳送一個介於 200299 之間的 HTTP 狀態碼到佇列。如果傳送的是任何其他數值,則代表工作失敗且佇列會重試工作。

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            loggerFactory.AddDebug();
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, request

app = Flask(__name__)

@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return 'Printed task payload: {}'.format(payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP

require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go

// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	t, ok := r.Header["X-Appengine-Taskname"]
	if !ok || len(t[0]) == 0 {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}
	taskName := t[0]

	// Pull useful headers from Task request.
	q, ok := r.Header["X-Appengine-Queuename"]
	queueName := ""
	if ok {
		queueName = q[0]
	}

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const bodyParser = require('body-parser');
const express = require('express');

const app = express();
app.enable('trust proxy');

app.use(bodyParser.raw());
app.use(bodyParser.json());
app.use(bodyParser.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log('Received task with payload: %s', req.body);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(process.env.PORT || 8080, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

逾時

App Engine 工作有其特定的逾時時間,這會根據執行 App Engine 工作之服務的資源調度類型而定。

對於在標準環境中執行的工作站服務:

  • 自動調整資源配置:工作處理作業必須在 10 分鐘內完成。
  • 手動調整資源配置和基本資源配置:要求最多可執行 24 小時。

對於在彈性環境中執行的工作站服務:所有類型都會在超過 60 分鐘後逾時。

如果您的處理常式超過期限,佇列會認定工作失敗並再次執行工作。

讀取要求標頭

由 App Engine 佇列傳送到處理常式的要求會有特殊標頭,這些標頭含有處理常式可能會用到的工作專屬資訊。

這些標頭由內部設定。如果向應用程式發出的外部使用者要求中具有這種標頭,系統會將其替換為內部標頭,但如果要求來自於已登入應用程式的系統管理員,則屬於例外情況,系統允許系統管理員設定標頭以進行測試。

Cloud Tasks 發出的要求一律包含下列標頭:

標頭 說明
X-AppEngine-QueueName 佇列名稱。
X-AppEngine-TaskName 工作的「簡短」名稱;如果在建立工作時沒有指定名稱,則是由系統產生的唯一 ID 來代替。這是完整工作名稱中的「my-task-id」值,即 task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id。
X-AppEngine-TaskRetryCount 這個工作的已重試次數。若為第一次嘗試,此值為 0。這個數字會將工作因欠缺可用執行個體,所以從未達到執行階段而失敗的次數計算在內。
X-AppEngine-TaskExecutionCount 工作收到處理常式回應的總次數。由於 Cloud Tasks 在收到成功回應後會刪除工作,所以先前的所有處理常式回應都是失敗。這個數字不包含因欠缺可用執行個體而失敗的次數。
X-AppEngine-TaskETA 工作的排程時間,從 1970 年 1 月 1 日算起並以秒數表示。

如果要求處理常式包含任何上列標頭,即可信任這個要求是由 Cloud Tasks 發出的要求。

此外,Cloud Tasks 發出的要求還可能包含下列標頭:

標頭 說明
X-AppEngine-TaskPreviousResponse 來自上次重試的 HTTP 回應碼。
X-AppEngine-TaskRetryReason 重試工作的原因。
X-AppEngine-FailFast 表示如果沒有可用的現有執行個體,工作會立即失敗。

目標轉送

佇列與工作處理常式都是在相同的 GCP 專案內執行。流量在傳輸時會經過加密,且絕不會從 Google 資料中心外流。因為流量是透過 Google 內部的通訊機制傳送,所以您無法明確設定通訊協定 (例如 HTTP 或 HTTPS)。不過,對處理常式的要求會「看似」已採用 HTTP 通訊協定。

工作可能會分派到安全的工作處理常式、不安全的工作處理常式,以及限制為 login: admin 的 URI。因為工作並非以任何使用者的身分執行,所以「無法」分派到限制為 login: required 的 URI。工作分派也不會追蹤重新導向。

相關資源

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
Cloud Tasks 說明文件