Membuat pengendali tugas App Engine

Halaman ini menunjukkan cara membuat pengendali tugas App Engine, kode pekerja yang menangani tugas App Engine. Antrean Cloud Tasks mengirimkan permintaan HTTP ke pengendali tugas Anda. Setelah berhasil menyelesaikan pemrosesan, pengendali harus mengirim kode status HTTP antara 200 dan 299 kembali ke antrean. Jika nilai berupa nilai lainnya, berarti tugas gagal dan antrean akan mencoba lagi tugas tersebut.

Permintaan Task Queue App Engine dikirim dari alamat IP 0.1.0.2. Lihat juga rentang IP untuk permintaan yang dikirim ke lingkungan App Engine.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging(builder => builder.AddDebug());
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, render_template, request

app = Flask(__name__)

@app.route("/example_task_handler", methods=["POST"])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or "(empty payload)"
    print(f"Received task with payload: {payload}")
    return render_template("index.html", payload=payload)

Java

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const express = require('express');

const app = express();
app.enable('trust proxy');

// Set the Content-Type of the Cloud Task to ensure compatibility
// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(express.text());

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log(`Received task with payload: ${req.body}`);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

Waktu tunggu

Tugas App Engine memiliki waktu tunggu tertentu yang bergantung pada jenis penskalaan layanan yang menjalankannya.

Untuk layanan pekerja yang berjalan di lingkungan standar:

  • Penskalaan otomatis: pemrosesan tugas harus selesai dalam 10 menit.
  • Penskalaan manual dan dasar: permintaan dapat berjalan hingga 24 jam.

Untuk layanan pekerja yang berjalan di lingkungan fleksibel: semua jenis memiliki waktu tunggu 60 menit.

Jika pengendali Anda melewati batas waktu, antrean akan menganggap tugas gagal dan mencobanya lagi.

Membaca header permintaan tugas App Engine

Permintaan yang dikirim ke pengendali App Engine oleh antrean Cloud Tasks memiliki header khusus, yang berisi informasi khusus tugas yang mungkin ingin digunakan pengendali Anda.

Header ini ditetapkan secara internal. Jika salah satu header ini ada dalam permintaan pengguna eksternal ke aplikasi Anda, header tersebut akan diganti dengan header internal — kecuali untuk permintaan dari administrator aplikasi yang login, yang diizinkan untuk menetapkan header untuk tujuan pengujian.

Permintaan tugas App Engine selalu berisi header berikut:

Header Deskripsi
X-AppEngine-QueueName Nama antrean.
X-AppEngine-TaskName Nama "singkat" tugas, atau, jika tidak ada nama yang ditentukan saat pembuatan, ID unik yang dihasilkan sistem. Ini adalah nilai my-task-id dalam nama tugas lengkap; misalnya, task_name = projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id.
X-AppEngine-TaskRetryCount Frekuensi tugas dicoba lagi. Untuk upaya pertama, nilainya adalah 0. Jumlah ini mencakup upaya saat tugas gagal karena kurangnya instance yang tersedia dan tidak pernah mencapai fase eksekusi.
X-AppEngine-TaskExecutionCount Frekuensi eksekusi tugas dan menerima respons dari pengendali. Karena Cloud Tasks menghapus tugas setelah respons yang berhasil diterima, semua respons pengendali sebelumnya adalah kegagalan. Jumlah ini tidak mencakup kegagalan karena kurangnya instance yang tersedia. Perhatikan bahwa X-AppEngine-TaskExecutionCount bisa sama dengan X-AppEngine-TaskRetryCount jika diupdate sebelum eksekusi dicoba.
X-AppEngine-TaskETA Waktu jadwal tugas, yang ditentukan dalam detik sejak 1 Januari 1970.

Jika pengendali permintaan Anda menemukan salah satu header yang tercantum di atas, pengendali dapat memercayai bahwa permintaan tersebut adalah permintaan Cloud Tasks.

Selain itu, permintaan dari Cloud Tasks mungkin berisi header berikut:

Header Deskripsi
X-AppEngine-TaskPreviousResponse Kode respons HTTP dari percobaan ulang sebelumnya.
X-AppEngine-TaskRetryReason Alasan percobaan ulang tugas.
X-AppEngine-FailFast Menunjukkan bahwa tugas akan langsung gagal jika instance yang ada tidak tersedia.

Pemilihan rute target

Dalam tugas App Engine, antrean dan pengendali tugas berjalan dalam project Google Cloud yang sama. Lalu lintas dienkripsi selama transpor dan tidak pernah meninggalkan pusat data Google. Anda tidak dapat menetapkan protokol secara eksplisit (misalnya, HTTP atau HTTPS). Namun, permintaan ke pengendali akan tampak telah menggunakan protokol HTTP.

Tugas dapat dikirim ke pengendali tugas yang aman, pengendali tugas yang tidak aman, dan dalam runtime yang didukung, URI yang dibatasi dengan login: admin. Karena tugas tidak dijalankan sebagai pengguna, tugas tersebut tidak dapat dikirim ke URI yang dibatasi dengan login: required. Pengiriman tugas juga tidak mengikuti pengalihan.

Langkah selanjutnya