Créer des gestionnaires de tâches App Engine

Cette page montre comment créer un gestionnaire de tâches App Engine, la un code de nœud de calcul qui gère une tâche App Engine. La file d'attente Cloud Tasks envoie des requêtes HTTP à votre gestionnaire de tâches. Une fois le traitement effectué, le gestionnaire doit envoyer un état HTTP compris entre 200 et 299 à la file d'attente. Toute autre valeur indique que la tâche a échoué et que la file d'attente la relance.

Les requêtes de file d'attente de tâches App Engine sont envoyées à partir de l'adresse IP 0.1.0.2. Consultez également les Plage d'adresses IP pour les requêtes envoyées à l'environnement App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Python

from flask import Flask, render_template, request

app = Flask(__name__)


@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Délais avant expiration

Les tâches App Engine ont des délais avant expiration spécifiques qui dépendent type de scaling du service qui les exécute.

Pour les services de nœud de calcul s'exécutant dans l'environnement standard :

  • Scaling automatique : le traitement de la tâche doit s'achever en 10 minutes.
  • Scaling manuel et de base : les requêtes peuvent durer jusqu'à 24 heures.

Pour les services de nœuds de calcul s'exécutant dans l'environnement flexible : tous les types ont un délai avant expiration de 60 minutes.

Si votre gestionnaire ne respecte pas le délai, la file d'attente suppose que la tâche a échoué et effectue une nouvelle tentative.

Lire les en-têtes de requête de tâche App Engine

Les requêtes envoyées à votre gestionnaire App Engine par une file d'attente Cloud Tasks comportent des en-têtes spéciaux qui contiennent des informations spécifiques à la tâche que votre gestionnaire peut utiliser.

Ces en-têtes sont définis en interne. Si l'un de ces en-têtes est présent dans une requête d'utilisateur externe adressée à l'application, il est remplacé par un en-tête interne, à l'exception des requêtes des administrateurs connectés de l'application qui sont autorisés à définir des en-têtes à des fins de test.

Les requêtes de tâches App Engine contiennent toujours les en-têtes suivants :

Header Description
X-AppEngine-QueueName Nom de la file d'attente.
X-AppEngine-TaskName Nom "court" de la tâche ou, si aucun nom n'a été spécifié lors de la création, identifiant unique généré par le système. Il s'agit de la valeur my-task-id dans le nom complet de la tâche, par exemple task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Nombre de fois où la tâche a fait l'objet de nouvelles tentatives d'exécution. Pour la première tentative, cette valeur est définie sur 0. Ce chiffre inclut les tentatives lorsque la tâche a échoué en raison d'un manque d'instances disponibles et n'a jamais atteint la phase d'exécution.
X-AppEngine-TaskExecutionCount Nombre de fois où la tâche a été exécutée et a reçu une réponse du gestionnaire. Étant donné que Cloud Tasks supprime la tâche après réception d'une réponse positive, toutes les réponses précédentes du gestionnaire sont des échecs. Ce chiffre n'inclut pas les échecs dus à un manque d'instances disponibles. Notez que X-AppEngine-TaskExecutionCount peut être égal à X-AppEngine-TaskRetryCount s'il est mis à jour avant une tentative d'exécution.
X-AppEngine-TaskETA Heure de planification de la tâche spécifiée en secondes depuis le 1er janvier 1970.

Si votre gestionnaire de requêtes trouve l'un des en-têtes répertoriés précédemment, il peut partent du principe qu'il s'agit d'une requête Cloud Tasks.

En outre, les requêtes Cloud Tasks peuvent contenir les en-têtes suivants :

En-tête Description
X-AppEngine-TaskPreviousResponse Code de réponse HTTP de la tentative précédente.
X-AppEngine-TaskRetryReason Raison de la nouvelle tentative d'exécution de la tâche.
X-AppEngine-FailFast Indique qu'une tâche échoue immédiatement si une instance existante n'est pas disponible.

Routage cible

Dans les tâches App Engine, la file d'attente et le gestionnaire de tâches s'exécutent dans le même projet Google Cloud. Le trafic est chiffré pendant le transport et n'est jamais quitte les centres de données Google. Vous ne pouvez pas définir explicitement le protocole (par (par exemple, HTTP ou HTTPS). En revanche, la requête envoyée au gestionnaire apparaîtra dans ont utilisé le protocole HTTP.

Les tâches peuvent être distribuées à des gestionnaires de tâches sécurisés, des gestionnaires de tâches non sécurisés d'environnements d'exécution compatibles, login: admin Étant donné que les tâches ne sont pas exécutées en tant qu'utilisateur, elles ne peuvent pas être distribuées aux URI limité avec login: required. Les envois de tâches ne suivent pas non plus les redirections.

Étape suivante