Crie controladores de tarefas do App Engine

Esta página demonstra como criar um processador de tarefas do App Engine, o código de trabalho que processa uma tarefa do App Engine. A fila do Cloud Tasks envia pedidos HTTP para o seu controlador de tarefas. Após a conclusão bem-sucedida do processamento, o controlador tem de enviar um código de estado HTTP entre 200 e 299 de volta para a fila. Qualquer outro valor indica que a tarefa falhou e a fila tenta novamente a tarefa.

Os pedidos da fila de tarefas do App Engine são enviados a partir do endereço IP 0.1.0.2. Consulte também o intervalo de IP para pedidos enviados para o ambiente do App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Ir


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := io.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Python

from flask import Flask, render_template, request

app = Flask(__name__)


@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Limites de tempo

As tarefas do App Engine têm limites de tempo específicos que dependem do tipo de escalabilidade do serviço que as está a executar.

Para serviços de worker em execução no ambiente padrão:

  • Escalamento automático: o processamento de tarefas tem de terminar em 10 minutos.
  • Ajuste de escala manual e básico: os pedidos podem ser executados durante um máximo de 24 horas.

Para serviços de trabalhadores executados no ambiente flexível: todos os tipos têm um limite de tempo de 60 minutos.

Se o seu controlador não cumprir o prazo, a fila assume que a tarefa falhou e tenta novamente.

Ler cabeçalhos de pedidos de tarefas do App Engine

Os pedidos enviados para o seu controlador do App Engine por uma fila do Cloud Tasks têm cabeçalhos especiais, que contêm informações específicas da tarefa que o seu controlador pode querer usar.

Estes cabeçalhos são definidos internamente. Se algum destes cabeçalhos estiver presente num pedido de utilizador externo à sua app, é substituído pelos internos, exceto no caso de pedidos de administradores com sessão iniciada na aplicação, aos quais é permitido definir cabeçalhos para fins de teste.

Os pedidos de tarefas do App Engine contêm sempre os seguintes cabeçalhos:

Cabeçalho Descrição
X-AppEngine-QueueName O nome da fila.
X-AppEngine-TaskName O nome "curto" da tarefa ou, se não tiver sido especificado nenhum nome na criação, um ID exclusivo gerado pelo sistema. Este é o valor my-task-id no nome da tarefa completo; por exemplo, task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount O número de vezes que a tarefa foi repetida. Para a primeira tentativa, este valor é 0. Este número inclui tentativas em que a tarefa falhou devido à falta de instâncias disponíveis e nunca atingiu a fase de execução.
X-AppEngine-TaskExecutionCount O número de vezes que a tarefa foi executada e recebeu uma resposta do controlador. Uma vez que o Cloud Tasks elimina a tarefa assim que recebe uma resposta bem-sucedida, todas as respostas anteriores do controlador são falhas. Este número não inclui falhas devido à falta de instâncias disponíveis. Tenha em atenção que X-AppEngine-TaskExecutionCount pode ser igual a X-AppEngine-TaskRetryCount se for atualizado antes de se tentar uma execução.
X-AppEngine-TaskETA A hora agendada da tarefa, especificada em segundos desde 1 de janeiro de 1970.

Se o seu controlador de pedidos encontrar algum dos cabeçalhos indicados anteriormente, pode presumir que o pedido é um pedido do Cloud Tasks.

Além disso, os pedidos do Cloud Tasks podem conter os seguintes cabeçalhos:

Cabeçalho Descrição
X-AppEngine-TaskPreviousResponse O código de resposta HTTP da nova tentativa anterior.
X-AppEngine-TaskRetryReason O motivo da repetição da tarefa.
X-AppEngine-FailFast Indica que uma tarefa falha imediatamente se uma instância existente não estiver disponível.

Encaminhamento de destino

Nas tarefas do App Engine, a fila e o controlador de tarefas são executados no mesmo Google Cloud projeto. O tráfego é encriptado durante o transporte e nunca sai dos centros de dados da Google. Não pode definir explicitamente o protocolo (por exemplo, HTTP ou HTTPS). No entanto, o pedido ao controlador parece ter usado o protocolo HTTP.

As tarefas podem ser enviadas para processadores de tarefas seguros, processadores de tarefas não seguros e, em tempos de execução suportados, URIs restritos com login: admin. Uma vez que as tarefas não são executadas como nenhum utilizador, não podem ser enviadas para URIs restritos com login: required. Os envios de tarefas também não seguem redirecionamentos.

O que se segue?