App Engine 작업 핸들러 만들기

이 페이지에서는 App Engine 태스크를 처리하는 작업자 코드인 App Engine 태스크 핸들러를 만드는 방법을 보여줍니다. Cloud Tasks 큐가 HTTP 요청을 태스크 핸들러로 전달합니다. 처리가 완료되면 핸들러는 200~299 범위의 HTTP 상태 코드를 큐로 다시 보내야 합니다. 그 외의 모든 값은 태스크가 실패하여 큐가 태스크를 다시 시도한다는 것을 나타냅니다.

C#

    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddRouting();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
        {
            loggerFactory.AddDebug();
            var logger = loggerFactory.CreateLogger("testStackdriverLogging");

            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                // Configure error reporting service.
                app.UseExceptionHandler("/Home/Error");
            }

            var routeBuilder = new RouteBuilder(app);

            routeBuilder.MapPost("log_payload", context =>
            {
                // Log the request payload
                var reader = new StreamReader(context.Request.Body);
                var task = reader.ReadToEnd();

                logger.LogInformation($"Received task with payload: {task}");
                return context.Response.WriteAsync($"Printed task payload: {task}");
            });

            routeBuilder.MapGet("hello", context =>
            {
                // Basic index to verify app is serving
                return context.Response.WriteAsync("Hello, world!");
            });

            routeBuilder.MapGet("_ah/health", context =>
            {
                // Respond to GAE health-checks
                return context.Response.WriteAsync("OK");
            });

            routeBuilder.MapGet("/", context =>
            {
                return context.Response.WriteAsync("Hello, world!");
            });

            var routes = routeBuilder.Build();
            app.UseRouter(routes);
        }
    }

Python

from flask import Flask, request

app = Flask(__name__)

@app.route('/example_task_handler', methods=['POST'])
def example_task_handler():
    """Log the request payload."""
    payload = request.get_data(as_text=True) or '(empty payload)'
    print('Received task with payload: {}'.format(payload))
    return 'Printed task payload: {}'.format(payload)

자바

@WebServlet(
    name = "Tasks",
    description = "Create Cloud Task",
    urlPatterns = "/tasks/create"
)
public class TaskServlet extends HttpServlet {
  private static Logger log = Logger.getLogger(TaskServlet.class.getName());

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    log.info("Received task request: " + req.getServletPath());
    String body = req.getReader()
        .lines()
        .reduce("", (accumulator, actual) -> accumulator + actual);

    if (!body.isEmpty()) {
      log.info("Request payload: " + body);
      String output = String.format("Received task with payload %s", body);
      resp.getOutputStream().write(output.getBytes());
      log.info("Sending response: " + output);
      resp.setStatus(HttpServletResponse.SC_OK);
    } else {
      log.warning("Null payload received in request to " + req.getServletPath());
    }
  }
}

PHP


require __DIR__ . '/vendor/autoload.php';

use Google\Cloud\Logging\LoggingClient;

// Create the logging client.
$logging = new LoggingClient();
// Create a PSR-3-compatible logger.
$logger = $logging->psrLogger('app', ['batchEnabled' => true]);

// Front-controller to route requests.
switch (@parse_url($_SERVER['REQUEST_URI'])['path']) {
    case '/':
        print "Hello, World!\n";
        break;
    case '/task_handler':
        // Taskname and Queuename are two of several useful Cloud Tasks headers available on the request.
        $taskName = $_SERVER['HTTP_X_APPENGINE_TASKNAME'] ?? '';
        $queueName = $_SERVER['HTTP_X_APPENGINE_QUEUENAME'] ?? '';

        try {
            handle_task(
                $queueName,
                $taskName,
                file_get_contents('php://input')
            );
        } catch (Exception $e) {
            http_response_code(400);
            exit($e->getMessage());
        }
        break;
    default:
        http_response_code(404);
        exit('Not Found');
}

/**
 * Process a Cloud Tasks HTTP Request.
 *
 * @param string $queueName provides the name of the queue which dispatched the task.
 * @param string $taskName provides the identifier of the task.
 * @param string $body The task details from the HTTP request.
 */
function handle_task($queueName, $taskName, $body = '')
{
    global $logger;

    if (empty($taskName)) {
        // You may use the presence of the X-Appengine-Taskname header to validate
        // the request comes from Cloud Tasks.
        $logger->warning('Invalid Task: No X-Appengine-Taskname request header found');
        throw new Exception('Bad Request - Invalid Task');
    }

    $output = sprintf('Completed task: task queue(%s), task name(%s), payload(%s)', $queueName, $taskName, $body);
    $logger->info($output);

    // Set a non-2xx status code to indicate a failure in task processing that should be retried.
    // For example, http_response_code(500) to indicate a server error.
    print $output;
}

Go


// Sample task_handler is an App Engine app demonstrating Cloud Tasks handling.
package main

import (
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

func main() {
	// Allow confirmation the task handling service is running.
	http.HandleFunc("/", indexHandler)

	// Handle all tasks.
	http.HandleFunc("/task_handler", taskHandler)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}

	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

// indexHandler responds to requests with our greeting.
func indexHandler(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" {
		http.NotFound(w, r)
		return
	}
	fmt.Fprint(w, "Hello, World!")
}

// taskHandler processes task requests.
func taskHandler(w http.ResponseWriter, r *http.Request) {
	taskName := r.Header.Get("X-Appengine-Taskname")
	if taskName == "" {
		// You may use the presence of the X-Appengine-Taskname header to validate
		// the request comes from Cloud Tasks.
		log.Println("Invalid Task: No X-Appengine-Taskname request header found")
		http.Error(w, "Bad Request - Invalid Task", http.StatusBadRequest)
		return
	}

	// Pull useful headers from Task request.
	queueName := r.Header.Get("X-Appengine-Queuename")

	// Extract the request body for further task details.
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ReadAll: %v", err)
		http.Error(w, "Internal Error", http.StatusInternalServerError)
		return
	}

	// Log & output details of the task.
	output := fmt.Sprintf("Completed task: task queue(%s), task name(%s), payload(%s)",
		queueName,
		taskName,
		string(body),
	)
	log.Println(output)

	// Set a non-2xx status code to indicate a failure in task processing that should be retried.
	// For example, http.Error(w, "Internal Server Error: Task Processing", http.StatusInternalServerError)
	fmt.Fprintln(w, output)
}

Node.js

const bodyParser = require('body-parser');
const express = require('express');

const app = express();
app.enable('trust proxy');

// By default, the Content-Type header of the Task request is set to "application/octet-stream"
// see https://cloud.google.com/tasks/docs/reference/rest/v2beta3/projects.locations.queues.tasks#AppEngineHttpRequest
app.use(bodyParser.raw({type: 'application/octet-stream'}));

app.get('/', (req, res) => {
  // Basic index to verify app is serving
  res.send('Hello, World!').end();
});

app.post('/log_payload', (req, res) => {
  // Log the request payload
  console.log('Received task with payload: %s', req.body);
  res.send(`Printed task payload: ${req.body}`).end();
});

app.get('*', (req, res) => {
  res.send('OK').end();
});

const PORT = process.env.PORT || 8080;
app.listen(process.env.PORT || 8080, () => {
  console.log(`App listening on port ${PORT}`);
  console.log('Press Ctrl+C to quit.');
});

Ruby

require "sinatra"
require "json"

get "/" do
  # Basic index to verify app is serving
  "Hello World!"
end

post "/log_payload" do
  data = request.body.read
  # Log the request payload
  puts "Received task with payload: #{data}"
  "Printed task payload: #{data}"
end

시간 제한

App Engine 작업에는 해당 작업을 실행하는 서비스의 확장 유형에 따라 달라지는 구체적인 시간 제한이 있습니다.

표준 환경에서 실행되는 작업자 서비스의 경우 다음과 같습니다.

  • 자동 확장: 처리 중인 작업은 10분 내에 완료되어야 합니다.
  • 수동 및 기본 확장: 요청이 최대 24시간 실행될 수 있습니다.

가변형 환경에서 실행되는 작업자 서비스: 모든 유형에 60분의 시간 제한이 있습니다.

핸들러가 기한을 넘기면 큐는 태스크가 실패한 것으로 간주하고 태스크를 다시 시도합니다.

App Engine 태스크 요청 헤더 읽기

Cloud Tasks 큐에서 App Engine 핸들러로 전송한 요청에는 핸들러가 사용할 수 있는 태스크별 정보가 포함된 특수 헤더가 있습니다.

이러한 핸들러는 내부적으로 설정됩니다. 이러한 헤더 중 하나라도 앱에 대한 외부 사용자 요청에 있는 경우 내부 헤더로 대체됩니다. 단, 테스트용으로 헤더를 설정할 수 있는 로그인한 애플리케이션 관리자의 요청인 경우에는 해당되지 않습니다.

App Engine 태스크 요청에는 항상 다음 헤더가 포함됩니다.

헤더 설명
X-AppEngine-QueueName 대기열 이름입니다.
X-AppEngine-TaskName 작업의 '짧은' 이름입니다. 그러나 만들 때 이름이 지정되지 않은 경우에는 시스템에서 생성된 고유한 ID입니다. 다음 전체 작업 이름에서 'my-task-id' 값입니다. projects/my-project-id/locations/my-location/queues/my-queue-id/tasks/my-task-id
X-AppEngine-TaskRetryCount 이 작업이 다시 시도된 횟수입니다. 첫 시도의 경우, 이 값은 0입니다. 사용 가능한 인스턴스가 부족하여 태스크가 실패하고 실행 단계에 도달하지 못한 시도도 이 숫자에 포함됩니다.
X-AppEngine-TaskExecutionCount 작업이 핸들러로부터 응답을 받은 총 횟수입니다. Cloud Tasks가 성공적인 응답을 받으면 작업을 삭제하므로 이전의 모든 핸들러 응답은 실패였습니다. 사용할 수 있는 인스턴스가 부족하여 실패한 경우는 이 숫자에 포함되지 않습니다.
X-AppEngine-TaskETA 작업의 예약 시간으로, 1970년 1월 1일부터의 경과 시간(초)으로 지정됩니다.

요청 핸들러가 위에 나열된 헤더를 찾으면 요청이 Cloud Tasks 요청임을 신뢰할 수 있습니다.

또한 Cloud Tasks의 요청에는 다음 헤더가 포함될 수 있습니다.

헤더 설명
X-AppEngine-TaskPreviousResponse 지난번 재시도에서 나온 HTTP 응답 코드입니다.
X-AppEngine-TaskRetryReason 작업을 다시 시도하는 이유입니다.
X-AppEngine-FailFast 기존 인스턴스를 사용할 수 없으면 작업이 즉시 실패함을 나타냅니다.

대상 라우팅

App Engine 태스크에서 큐와 태스크 핸들러는 모두 동일한 Cloud 프로젝트에서 실행됩니다. 트래픽은 전송 동안 암호화되며 Google 데이터 센터 외부로 나가지 않습니다. 이 트래픽은 Google 내부의 통신 메커니즘을 통해 전달되므로 명시적으로 프로토콜을 설정할 수 없습니다(예: HTTP 또는 HTTPS). 그러나 핸들러에 대한 요청은 HTTP 프로토콜을 사용한 것으로 나타납니다.

태스크는 보안 태스크 핸들러, 비보안 태스크 핸들러, 그리고 지원되는 런타임에서 login: admin으로 제한된 URI로 전달될 수 있습니다. 태스크는 사용자로 실행되지 않으므로 login: required로 제한된 URI에는 전달될 수 없습니다. 또한 태스크 전달은 리디렉션 따르지도 않습니다.

다음 단계