Crea gestori di attività App Engine

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questa pagina mostra come creare un gestore delle attività App Engine, il codice worker che gestisce un'attività App Engine. La coda Cloud Tasks invia richieste HTTP al tuo gestore delle attività. Una volta completata l'elaborazione, il gestore deve inviare un codice di stato HTTP tra 200 e 299 alla coda. Qualsiasi altro valore indica che l'attività non è riuscita e la coda tenta l'attività.

Le richieste in coda di attività di App Engine vengono inviate dall'indirizzo IP 0.1.0.2. Fai riferimento anche all'intervallo IP per le richieste inviate all'ambiente App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const bodyParser = require('body-parser');
const express = require('express');

const app = express();
app.enable('trust proxy');

// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(bodyParser.raw({type: 'application/octet-stream'}));

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log('Received task with payload: %s', req.body);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(process.env.PORT || 8080, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Timeout

Le attività di App Engine hanno timeout specifici che dipendono dal tipo di scalabilità del servizio che le esegue.

Per i servizi worker in esecuzione nell'ambiente standard:

  • Scalabilità automatica: l'elaborazione delle attività deve terminare entro 10 minuti.
  • Scalabilità manuale e di base: le richieste possono essere eseguite fino a 24 ore.

Per i servizi worker in esecuzione nell'ambiente flessibile: tutti i tipi hanno un timeout di 60 minuti.

Se il gestore non rispetta la scadenza, la coda presume che l'attività non sia riuscita e riprova.

Lettura delle intestazioni delle richieste di attività App Engine

Le richieste inviate al gestore di App Engine da una coda di Cloud Tasks hanno intestazioni speciali che contengono informazioni specifiche per le attività che il gestore potrebbe utilizzare.

Queste intestazioni sono impostate internamente. Se una di queste intestazioni è presente in una richiesta utente esterna alla tua applicazione, vengono sostituite da quelle interne, ad eccezione delle richieste degli amministratori dell'applicazione che hanno eseguito l'accesso, e possono impostare le intestazioni per scopi di test.

Le richieste di attività di App Engine contengono sempre le seguenti intestazioni:

Intestazione Descrizione
X-AppEngine-QueueName Il nome della coda.
X-AppEngine-TaskName Il nome "breve" dell'attività o, se non è stato specificato alcun nome, un ID univoco generato dal sistema. Questo è il valore my-task-id nel nome completo dell'attività, ad esempio task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Il numero di nuovi tentativi dell'attività. Per il primo tentativo, il valore è 0. Questo numero include i tentativi in cui l'attività non è riuscita per mancanza di istanze disponibili e non ha mai raggiunto la fase di esecuzione.
X-AppEngine-TaskExecutionCount Il numero di volte in cui l'attività è stata eseguita e ha ricevuto una risposta dal gestore. Poiché Cloud Tasks elimina l'attività una volta ricevuta una risposta riuscita, tutte le risposte precedenti del gestore sono errori. Questo numero non include errori dovuti alla mancanza di istanze disponibili. Tieni presente che X-AppEngine-TaskExecutionCount può essere uguale a X-AppEngine-TaskRetryCount se viene aggiornato prima di tentare un'esecuzione.
X-AppEngine-TaskETA L'ora di pianificazione dell'attività, specificata in secondi dal 1° gennaio 1970.

Se il gestore delle richieste rileva una delle intestazioni elencate in precedenza, può considerare attendibile la richiesta come attività di Cloud Tasks.

Inoltre, le richieste provenienti da Cloud Tasks possono contenere le seguenti intestazioni:

Intestazione Descrizione
X-AppEngine-TaskPreviousResponse Il codice di risposta HTTP del precedente tentativo.
X-AppEngine-TaskRetryReason Il motivo per riprovare l'attività.
X-AppEngine-FailFast Indica che un'attività non riesce immediatamente se un'istanza esistente non è disponibile.

Routing target

Nelle attività di App Engine, sia la coda che il gestore delle attività vengono eseguiti all'interno dello stesso progetto Cloud. Il traffico è criptato durante il trasporto e non lascia mai i data center di Google. Poiché questo traffico viene trasferito su un meccanismo di comunicazione interno a Google, non puoi impostare esplicitamente il protocollo (ad esempio HTTP o HTTPS). La richiesta al gestore, tuttavia, sembra di aver utilizzato il protocollo HTTP.

Le attività possono essere inviate a gestori di attività sicuri, gestori di attività non sicuri e in runtime supportati, URI limitati con login: admin. Poiché le attività non vengono eseguite come utenti, non possono essere inviate a URI limitati con login: required. Anche gli invii delle attività non seguono i reindirizzamenti.

Passaggi successivi