Criar gerenciadores de tarefas do App Engine

Nesta página, demonstramos como criar um gerenciador de tarefas do App Engine, o código do worker que processa uma tarefa do App Engine. A fila do Cloud Tasks envia solicitações HTTP para o gerenciador de tarefas. Após a conclusão do processamento, o gerenciador precisa enviar um código de status HTTP entre 200 e 299 de volta à fila. Qualquer outro valor indica que a tarefa falhou. Nesse caso, a fila tenta executar novamente a tarefa.

As solicitações da fila de tarefas do App Engine são enviadas do endereço IP 0.1.0.2. Consulte também o Intervalo de IP para solicitações enviadas ao ambiente do App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Tempo limite

As tarefas do App Engine têm tempos limite específicos que dependem do tipo de escalonamento do serviço que as executa.

Para serviços do worker em execução no ambiente padrão:

  • Escalonamento automático: o processamento de tarefas precisa terminar em 10 minutos.
  • Escalonamento manual e básico: as solicitações podem ser executadas em até 24 horas.

Para serviços do worker em execução no ambiente flexível: todos os tipos têm um tempo limite de 60 minutos.

Se o gerenciador perder o prazo, a fila presume que a tarefa falhou e tenta novamente.

Como ler cabeçalhos de solicitação de tarefas do App Engine

As solicitações enviadas ao gerenciador do App Engine por uma fila do Cloud Tasks têm cabeçalhos especiais, que contêm informações específicas da tarefa que o gerenciador pode querer usar.

Esses cabeçalhos são definidos internamente. Se algum desses cabeçalhos estiver presente em uma solicitação de usuário externo para o app, ele será substituído pelos internos, exceto no caso de solicitações de administradores conectados do aplicativo, que podem definir cabeçalhos para fins de teste.

As solicitações de tarefas do App Engine sempre contêm os seguintes cabeçalhos:

Cabeçalho Descrição
X-AppEngine-QueueName O nome da fila.
X-AppEngine-TaskName O nome abreviado da tarefa ou, se nenhum nome foi especificado na criação, um código exclusivo gerado pelo sistema. Esse é o valor my-task-id no nome completo da tarefa. Por exemplo, task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount O número de vezes que a tarefa foi repetida. Para a primeira tentativa, esse valor é 0. Esse número inclui tentativas em que a tarefa falhou por falta de instâncias disponíveis e nunca chegou à fase de execução.
X-AppEngine-TaskExecutionCount O número de vezes que a tarefa foi executada e recebeu uma resposta do gerenciador. Como o Cloud Tasks exclui a tarefa depois que uma resposta bem-sucedida é recebida, todas as respostas anteriores do gerenciador são falhas. Esse número não inclui falhas por causa de uma falta de instâncias disponíveis. Observe que X-AppEngine-TaskExecutionCount poderá ser igual a X-AppEngine-TaskRetryCount se for atualizado antes da tentativa de execução.
X-AppEngine-TaskETA O horário de agendamento da tarefa, especificado em segundos desde 1º de janeiro de 1970.

Se encontrar algum dos cabeçalhos listados acima, o gerenciador de solicitações saberá que se trata de uma solicitação do Cloud Tasks.

Além disso, as solicitações do Cloud Tasks podem conter os seguintes cabeçalhos:

Cabeçalho Descrição
X-AppEngine-TaskPreviousResponse O código de resposta HTTP da tentativa anterior.
X-AppEngine-TaskRetryReason O motivo para tentar novamente a tarefa.
X-AppEngine-FailFast Informa que uma tarefa falhará imediatamente se uma instância existente não estiver disponível.

Roteamento direcionado

Nas tarefas do App Engine, a fila e o gerenciador de tarefas são executados no mesmo projeto do Google Cloud. O tráfego é criptografado durante o transporte e nunca sai dos data centers do Google. Não é possível definir explicitamente o protocolo (por exemplo, HTTP ou HTTPS). No entanto, a solicitação para o gerenciador parecerá ter usado o protocolo HTTP.

As tarefas podem ser enviadas para gerenciadores de tarefas seguros ou não seguros e, em ambientes de execução com suporte, URIs restritos com login: admin. Como as tarefas não são executadas como um usuário, elas não podem ser enviadas para URIs restritos com login: required. Os envios de tarefas também não seguem redirecionamentos.

A seguir