App Engine タスクハンドラを作成する

このページでは、App Engine タスクを処理するワーカーコードである App Engine タスクハンドラの作成方法を説明します。Cloud Tasks キューは HTTP リクエストをタスクハンドラに送信します。処理が正常に完了すると、ハンドラは 200299 の HTTP ステータス コードをキューに返します。その他の値を返した場合、タスクが失敗し、キューがタスクを再試行します。

App Engine タスクキュー リクエストは、IP アドレス 0.1.0.2 から送信されます。App Engine 環境に送信されるリクエストの IP 範囲もご覧ください。

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

タイムアウト

App Engine タスクには、実行中のサービスのスケーリング タイプに応じて固有のタイムアウトが設定されています。

スタンダード環境で実行されているワーカー サービスの場合:

  • 自動スケーリング: タスクの処理は 10 分で完了する必要があります。
  • 手動または基本スケーリング: リクエストは最大 24 時間まで実行できます。

フレキシブル環境で動作しているワーカー サービスの場合: すべてのタイプに 60 分のタイムアウトが設定されます。

ハンドラが期限までに処理を完了できない場合、キューはタスクが失敗したとみなしてタスクを再試行します。

App Engine タスクリクエスト ヘッダーの読み取り

Cloud Tasks キューによって App Engine ハンドラに送信されるリクエストは、特殊なヘッダーを有しています。このヘッダーには、ハンドラが使用する可能性のあるタスク固有の情報が含まれています。

これらのヘッダーは内部で設定されます。アプリケーションに対する外部ユーザー リクエストに、このようなヘッダーが 1 つでも含まれている場合は、いずれも内部ヘッダーで置き換えられます。ただし、ログイン済みのアプリケーション管理者からのリクエストは例外です。アプリケーション管理者は、テストの目的でヘッダーを設定することが許可されています。

App Engine タスク リクエストには、常に次のヘッダーが含まれます。

ヘッダー 説明
X-AppEngine-QueueName キューの名前。
X-AppEngine-TaskName タスクの「省略」名。または、作成時に名前が指定されなかった場合は、システムによって生成された一意の ID です。これは、完全なタスク名の my-task-id 値です。例: task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id
X-AppEngine-TaskRetryCount タスクが再試行された回数。最初の試行の場合は、この値は 0 です。この試行回数には、インスタンス数不足が原因でタスクが異常終了したため実行フェーズに到達できなかった試行も含まれています。
X-AppEngine-TaskExecutionCount タスクが実行され、ハンドラからレスポンスを受け取った回数。Cloud Tasks は成功のレスポンスを受け取った時点でタスクを削除するため、それ以前のハンドラからのレスポンスはすべて失敗を意味します。この回数には、インスタンス数不足が原因の失敗は含まれていません。 実行を試行する前に更新される場合、X-AppEngine-TaskExecutionCountX-AppEngine-TaskRetryCount と同じにできます。
X-AppEngine-TaskETA タスクのスケジュール時間。1970 年 1 月 1 日からの秒数で指定されます。

リクエスト ハンドラが上記のヘッダーのいずれかを検出した場合、そのリクエストは Cloud Tasks からのリクエストであると判断できます。

さらに、Cloud Tasks からのリクエストには次のヘッダーが含まれる場合もあります。

ヘッダー 説明
X-AppEngine-TaskPreviousResponse 前回の再試行の HTTP レスポンス コード。
X-AppEngine-TaskRetryReason タスクを再試行する理由。
X-AppEngine-FailFast 既存のインスタンスが使用できない場合は、タスクがすぐに失敗することを意味します。

ターゲット ルーティング

App Engine タスクでは、キューとタスクハンドラは同じ Google Cloud プロジェクト内で動作します。トラフィックは転送中に暗号化され、Google のデータセンターの外部に提供されることがありません。プロトコル(HTTP や HTTPS など)を明示的に設定することはできません。ただし、ハンドラへのリクエストは HTTP プロトコルを使用したかのように見えます

安全なタスクハンドラ、安全性の低いタスクハンドラ、サポートされるランタイムにおいては login: admin で制限された URI に、タスクをディスパッチできます。タスクはユーザーとして動作しないため、login: required で制限された URI にディスパッチできません。タスク ディスパッチはリダイレクトにも従いません。

次のステップ