App Engine-Aufgaben-Handler erstellen

Auf dieser Seite wird gezeigt, wie Sie einen App Engine-Aufgaben-Handler erstellen. Der Aufgaben-Handler ist der Worker-Code, der App Engine-Aufgaben verarbeitet. Die Cloud Tasks-Warteschlange sendet HTTP-Anfragen an Ihren Aufgaben-Handler. Nach erfolgreicher Verarbeitung muss der Handler einen HTTP-Status zwischen 200 und 299 an die Warteschlange zurücksenden. Jeder andere Wert gibt an, dass die Aufgabe fehlgeschlagen ist und die Warteschlange die Aufgabe wiederholt.

App Engine-Anfragen an die Aufgabenwarteschlange werden von der IP-Adresse 0.1.0.2 gesendet. Weitere Informationen finden Sie im Hilfeartikel IP-Bereich für Anfragen, die an die App Engine-Umgebung gesendet werden.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := io.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Python

from flask import Flask, render_template, request

app = Flask(__name__)


@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Zeitlimits

Für App Engine-Aufgaben sind bestimmte Zeitüberschreitungen festgelegt, die vom Skalierungstyp des Dienstes abhängen, der sie ausführt.

Für Worker-Dienste, die in der Standardumgebung ausgeführt werden:

  • Automatische Skalierung: Die Verarbeitung der Aufgabe muss in zehn Minuten abgeschlossen sein.
  • Manuelle und grundlegende Skalierungsanfragen können bis zu 24 Stunden ausgeführt werden.

Für Worker-Dienste, die in der Flex-Umgebung ausgeführt werden: Alle Typen haben eine Zeitüberschreitung von 60 Minuten.

Wenn die Frist für Ihren Handler verstrichen ist, geht die Warteschlange davon aus, dass die Aufgabe fehlgeschlagen ist, und versucht die Ausführung noch einmal.

App Engine-Aufgabenanfrage-Header lesen

Anfragen, die von einer Cloud Tasks-Warteschlange an Ihren App Engine-Handler gesendet werden, haben spezielle Header, die aufgabenspezifische Informationen enthalten, die Ihr Handler möglicherweise verwenden möchte.

Diese Header werden intern festgelegt. Wenn einer dieser Header in einer externen Nutzeranfrage an Ihre Anwendung vorhanden ist, wird dieser durch einen internen Header ersetzt. Eine Ausnahme sind Anfragen von angemeldeten Administratoren der Anwendung, die Header für Testzwecke angeben dürfen.

App Engine-Aufgabenanfragen enthalten immer die folgenden Header:

Header Beschreibung
X-AppEngine-QueueName Der Name der Warteschlange.
X-AppEngine-TaskName Der "kurze" Name der Aufgabe oder, wenn bei der Erstellung kein Name festgelegt wurde, eine vom System generierte eindeutige ID. Dies ist der my-task-id-Wert im vollständigen Aufgabennamen, z. B. task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Die Anzahl der Ausführungsversuche für die Aufgabe. Für den ersten Versuch lautet dieser Wert 0. Diese Anzahl enthält Versuche, bei denen die Aufgabe aufgrund fehlender verfügbarer Instanzen fehlgeschlagen ist und die Ausführungsphase nicht erreicht wurde.
X-AppEngine-TaskExecutionCount Die Anzahl der Male, die die Aufgabe ausgeführt und eine Antwort vom Handler erhalten hat. Da Cloud Tasks eine Aufgabe löscht, sobald eine erfolgreiche Antwort empfangen wurde, sind alle vorherigen Handler-Antworten fehlgeschlagen. Für diesen Wert werden Fehler aufgrund fehlender verfügbarer Instanzen nicht berücksichtigt. Hinweis: X-AppEngine-TaskExecutionCount kann mit X-AppEngine-TaskRetryCount übereinstimmen, wenn es aktualisiert wird, bevor eine Ausführung versucht wird.
X-AppEngine-TaskETA Die geplante Zeit für eine Aufgabe, angegeben in Sekunden seit dem 1. Januar 1970.

Wenn der Anfrage-Handler einen der oben aufgeführten Header erkennt, kann er davon ausgehen, dass es sich bei der Anfrage um eine Anfrage von Cloud Tasks handelt.

Außerdem können Anfragen von Cloud Tasks die folgenden Header enthalten:

Header Beschreibung
X-AppEngine-TaskPreviousResponse Der HTTP-Antwortcode aus der vorangegangenen Wiederholung.
X-AppEngine-TaskRetryReason Der Grund für die Wiederholung der Aufgabe.
X-AppEngine-FailFast Zeigt an, dass eine Aufgabe sofort fehlschlägt, wenn eine vorhandene Instanz nicht verfügbar ist.

Ziel-Routing

Bei App Engine-Aufgaben werden die Warteschlange und der Aufgaben-Handler innerhalb desselben Google Cloud -Projekts ausgeführt. Der Traffic wird während des Transports verschlüsselt und verlässt die Google-Rechenzentren zu keinem Zeitpunkt. Sie können das Protokoll nicht explizit festlegen (z. B. HTTP oder HTTPS). Es erscheint jedoch so, als ob die Anfrage an den Handler das HTTP-Protokoll verwendet hätte.

Aufgaben können an sichere und unsichere Aufgaben-Handler sowie in unterstützten Laufzeiten an URIs mit der Einschränkung login: admin weitergeleitet werden. Da Aufgaben nicht im Namen von Nutzern ausgeführt werden, können sie nicht an URIs mit der Einschränkung login: required weitergeleitet werden. Auch folgen Aufgabenweiterleitungen nicht den sonstigen Weiterleitungen.

Nächste Schritte