Crea controladores de tareas de App Engine

En esta página, se muestra cómo crear un controlador de tareas de App Engine, el código del trabajador que controla una tarea de App Engine. La cola de Cloud Tasks envía solicitudes HTTP a tu controlador de tareas. Cuando el procesamiento se completa con éxito, el controlador debe enviar un código de estado HTTP entre 200 y 299 de nuevo a la cola. Si se envía cualquier otro valor, significa que la tarea tuvo un error, por lo que la cola la reintenta.

Las solicitudes de lista de tareas en cola de App Engine se envían desde la dirección IP 0.1.0.2. Consulta también el rango de IP para solicitudes enviadas al entorno de App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Rita

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Tiempos de espera

Las tareas de App Engine tienen tiempos de espera específicos que dependen del tipo de escalamiento del servicio que las ejecuta.

Estas son las opciones en el caso de los servicios trabajadores que se ejecutan en el entorno estándar:

  • Escalamiento automático: Las tareas deben terminar de procesarse en 10 minutos.
  • Escalamiento manual y básico: Las solicitudes pueden ejecutarse durante un máximo de 24 horas.

En el caso de los servicios trabajadores que se ejecutan en el entorno flexible, todos los tipos tienen un tiempo de espera de 60 minutos.

Si tu controlador no cumple con el plazo, la cola asume que la tarea tuvo un error y la vuelve a intentar.

Leer encabezados de solicitud de tareas de App Engine

Las solicitudes que una cola de Cloud Tasks envía a tu controlador de App Engine tienen encabezados especiales que contienen información específica de la tarea que tu controlador podría usar.

Estos encabezados se configuran internamente. Si alguno de estos encabezados está presente en una solicitud de usuario externo a tu app, se reemplazará por los internos, excepto en el caso de las solicitudes de los administradores que hayan accedido a la aplicación, quienes tienen permitido establecer encabezados con fines de prueba.

Las solicitudes de tareas de App Engine siempre contienen los siguientes encabezados:

Header Descripción
X-AppEngine-QueueName El nombre de la cola.
X-AppEngine-TaskName El nombre "breve" de la tarea o (si no se especificó un nombre durante su creación) un ID único generado por el sistema. Este es el valor my-task-id en el nombre completo de la tarea; por ejemplo, task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Es la cantidad de veces que se reintentó la tarea. Si es el primer intento, el valor es 0. Este número incluye los intentos en los que la tarea falló debido a la falta de instancias disponibles y nunca llegó a la fase de ejecución.
X-AppEngine-TaskExecutionCount La cantidad de veces que la tarea se ejecutó y recibió una respuesta del controlador. Dado que Cloud Tasks borra la tarea una vez que se recibe una respuesta correcta, todas las respuestas anteriores del controlador son fallidas. Este número no incluye las fallas ocasionadas por la falta de instancias disponibles. Ten en cuenta que X-AppEngine-TaskExecutionCount puede ser igual a X-AppEngine-TaskRetryCount si se actualiza antes de intentar una ejecución.
X-AppEngine-TaskETA La fecha y hora programadas para la tarea, que se especifica en la cantidad de segundos transcurridos desde el 1 de enero de 1970.

Si tu controlador de solicitudes encuentra alguno de los encabezados antes mencionados, podrá confiar en que la solicitud proviene de Cloud Tasks.

Además, las solicitudes de Cloud Tasks pueden contener los siguientes encabezados:

Header Descripción
X-AppEngine-TaskPreviousResponse El código de respuesta HTTP del reintento anterior.
X-AppEngine-TaskRetryReason El motivo por el que se volvió a intentar la tarea.
X-AppEngine-FailFast Indica que una tarea falla inmediatamente si no hay una instancia existente disponible.

Enrutamiento al objetivo

En las tareas de App Engine, la cola y el controlador de tareas se ejecutan dentro del mismo proyecto de Google Cloud. El tráfico se encripta durante el transporte y nunca sale de los centros de datos de Google. No puedes configurar el protocolo de forma explícita (por ejemplo, HTTP o HTTPS). Sin embargo, parecerá que la solicitud al controlador usó el protocolo HTTP.

Las tareas se pueden enviar a controladores de tareas seguros, controladores de tareas no seguros y, en los entornos de ejecución compatibles, URI restringidos con login: admin. Debido a que las tareas no se ejecutan como ningún usuario, no se pueden enviar a URI restringidos con login: required. Los envíos de tareas tampoco siguen los redireccionamientos.

¿Qué sigue?