App Engine-Task-Handler erstellen

Auf dieser Seite wird beschrieben, wie Sie einen App Engine-Aufgaben-Handler erstellen. Das ist der Worker-Code, mit dem eine App Engine-Aufgabe verarbeitet wird. Die Cloud Tasks-Warteschlange sendet HTTP-Anfragen an den Aufgaben-Handler. Nach erfolgreicher Verarbeitung muss der Handler einen HTTP-Status zwischen 200 und 299 an die Warteschlange zurücksenden. Jeder andere Wert gibt an, dass die Aufgabe fehlgeschlagen ist und die Warteschlange die Aufgabe wiederholt.

Anfragen der App Engine-Aufgabenwarteschlange werden von der IP-Adresse 0.1.0.2 gesendet. Weitere Informationen finden Sie unter IP-Bereich für Anfragen, die an die App Engine-Umgebung gesendet werden.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Einfach loslegen (Go)


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Zeitlimits

Für App Engine-Aufgaben gelten bestimmte Zeitlimits, die vom Skalierungstyp des Dienstes abhängen, der sie ausführt.

Für Worker-Dienste, die in der Standardumgebung ausgeführt werden:

  • Automatische Skalierung: Die Verarbeitung der Aufgabe muss in zehn Minuten abgeschlossen sein.
  • Manuelle und grundlegende Skalierungsanfragen können bis zu 24 Stunden ausgeführt werden.

Für Worker-Dienste, die in der flexiblen Umgebung ausgeführt werden: Alle Typen haben ein Zeitlimit von 60 Minuten.

Wenn die Frist für den Handler verstrichen ist, geht die Warteschlange davon aus, dass die Aufgabe fehlgeschlagen ist, und wiederholt sie.

App Engine-Aufgabenanfrage-Header lesen

Anfragen, die von einer Cloud Tasks-Warteschlange an Ihren App Engine-Handler gesendet werden, haben spezielle Header mit aufgabenspezifischen Informationen, die der Handler möglicherweise verwenden möchte.

Diese Header werden intern festgelegt. Wenn einer dieser Header in einer externen Nutzeranfrage an Ihre Anwendung vorhanden ist, werden sie durch die internen ersetzt. Eine Ausnahme bilden Anfragen von angemeldeten Administratoren der Anwendung, die Header für Testzwecke festlegen dürfen.

App Engine-Aufgabenanfragen enthalten immer die folgenden Header:

Header Beschreibung
X-AppEngine-QueueName Der Name der Warteschlange.
X-AppEngine-TaskName Der "kurze" Name der Aufgabe oder, wenn bei der Erstellung kein Name festgelegt wurde, eine vom System generierte eindeutige ID. Dies ist der Wert my-task-id im vollständigen Aufgabennamen, z. B. task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Die Anzahl der Wiederholungsversuche für die Aufgabe. Für den ersten Versuch lautet dieser Wert 0. Diese Anzahl enthält Versuche, bei denen die Aufgabe aufgrund fehlender verfügbarer Instanzen fehlgeschlagen ist und die Ausführungsphase nicht erreicht wurde.
X-AppEngine-TaskExecutionCount Die Häufigkeit, mit der die Task ausgeführt und eine Antwort vom Handler erhalten hat. Da Cloud Tasks die Aufgabe löscht, nachdem eine erfolgreiche Antwort empfangen wurde, sind alle vorherigen Handler-Antworten Fehler. Für diesen Wert werden Fehler aufgrund fehlender verfügbarer Instanzen nicht berücksichtigt. X-AppEngine-TaskExecutionCount kann gleich X-AppEngine-TaskRetryCount sein, wenn sie vor dem Versuch aktualisiert wird, sie auszuführen.
X-AppEngine-TaskETA Die geplante Zeit für eine Aufgabe, angegeben in Sekunden seit dem 1. Januar 1970.

Wenn der Anfrage-Handler einen der oben aufgeführten Header ermittelt, ist sicher, dass es sich bei der Anfrage um eine Anfrage von Cloud Tasks handelt.

Außerdem können Anfragen von Cloud Tasks die folgenden Header enthalten:

Header Beschreibung
X-AppEngine-TaskPreviousResponse Der HTTP-Antwortcode aus der vorangegangenen Wiederholung.
X-AppEngine-TaskRetryReason Der Grund für die Wiederholung der Aufgabe.
X-AppEngine-FailFast Zeigt an, dass eine Aufgabe sofort fehlschlägt, wenn eine vorhandene Instanz nicht verfügbar ist.

Ziel-Routing

Bei App Engine-Aufgaben werden die Warteschlange und der Aufgaben-Handler im selben Google Cloud-Projekt ausgeführt. Der Traffic wird während der Übertragung verschlüsselt und verlässt niemals Google-Rechenzentren. Sie können das Protokoll (z. B. HTTP oder HTTPS) nicht explizit festlegen. Es erscheint jedoch so, als hätte die Anfrage an den Handler das HTTP-Protokoll verwendet.

Aufgaben können an sichere und unsichere Aufgaben-Handler sowie in unterstützten Laufzeiten an URIs mit der Einschränkung login: admin weitergeleitet werden. Da Aufgaben nicht im Namen von Nutzern ausgeführt werden, können sie nicht an URIs weitergeleitet werden, die durch login: required eingeschränkt sind. Auch folgen Aufgabenweiterleitungen nicht den sonstigen Weiterleitungen.

Nächste Schritte