重试事件驱动型函数
本文档介绍了如何为事件驱动型函数启用重试。HTTP 函数无法自动重试。
重试的语义
对于事件来源发出的每个事件,Cloud Functions 保证至少执行一次事件驱动型函数。但是,默认情况下,如果函数调用因错误而终止,则该函数不会被再次调用,且相应事件将被丢弃。当您针对事件驱动型函数启用重试时,Cloud Functions 将重试失败的函数调用,直到该函数调用成功或重试期限(默认为 7 天)到期。
如果没有为函数启用重试(默认设置),则函数始终报告其成功执行,并且 200 OK
响应代码可能出现在其日志中。即使函数出现错误,也会发生这种情况。为了清楚说明函数何时出现错误,请务必适当地报告错误。
为什么事件驱动型函数会执行失败
在极少数情况下,函数可能会由于内部错误而提早退出,并且默认情况下函数可能会自动重试,也可能不会自动重试。
更常见的情况是,事件驱动型函数可能由于函数代码本身抛出错误而无法成功完成。可能导致发生这种情况的一些原因如下:
- 函数包含 bug,且运行时抛出异常。
- 函数无法访问服务端点,或者在尝试访问端点时超时。
- 函数本身有意抛出异常(例如,某个参数验证失败)。
- 以 Node.js 编写的函数返回遭拒的 promise 或将非
null
值传递给回调函数。
在上述任何情况下,函数默认都会停止执行,而事件会遭舍弃。如果要在出错时重试函数,可通过设置“失败时重试”属性来更改默认的重试政策。这会导致事件在多达数天的时间里不断重试,直到函数成功完成。
启用和停用重试
要启用或停用重试功能,您可以使用 gcloud
命令行工具或 Google Cloud 控制台。默认情况下,不允许重试。
使用 gcloud
命令行工具
要通过 gcloud
命令行工具启用重试功能,请在部署您的函数时添加 --retry
标志:
gcloud functions deploy FUNCTION_NAME --retry FLAGS...
要停用重试功能,请重新部署不使用 --retry
标志的函数:
gcloud functions deploy FUNCTION_NAME FLAGS...
使用 Google Cloud 控制台
您可以在 Google Cloud 控制台中启用或停用重试功能,步骤如下:
在 Cloud Platform 控制台中,转到 Cloud Functions 概览页面。
点击创建函数。或者,点击现有函数以转至其详情页面,然后点击修改。
填写函数的必填字段。
确保触发器字段设置为事件驱动型函数触发器类型,例如 Cloud Pub/Sub 或 Cloud Storage。
点击更多,展开高级设置。
选中或取消选中标有失败时重试的复选框。
最佳做法
本部分介绍有关如何利用重试的最佳实践。
利用重试来应对暂时性错误
由于函数会一直重试,直到成功执行为止,因此在启用重试之前应进行彻底的测试,以从代码中清除 bug 之类的永久性错误。重试最适合用于应对通过重试可以解决的间歇性/暂时性故障,比如不稳定的服务端点或超时。
设置结束条件以避免无限重试循环
在启用重试时,您应防止函数陷入连续循环。您可以在函数开始处理之前添加明确定义的结束条件。请注意,此方法仅在您的函数成功启动并且能够评估结束条件时才有效。
一种简单而有效的方法是舍弃时间戳早于特定时间的事件。在发生永久性故障或持续时间长于预期的故障时,这样可以保证函数执行次数不会太多。
例如,以下代码段会舍弃超过 10 秒前发生的所有事件:
Node.js
/**
* Background Cloud Function that only executes within
* a certain time period after the triggering event
*
* @param {object} event The Cloud Functions event.
* @param {function} callback The callback function.
*/
exports.avoidInfiniteRetries = (event, callback) => {
const eventAge = Date.now() - Date.parse(event.timestamp);
const eventMaxAge = 10000;
// Ignore events that are too old
if (eventAge > eventMaxAge) {
console.log(`Dropping event ${event} with age ${eventAge} ms.`);
callback();
return;
}
// Do what the function is supposed to do
console.log(`Processing event ${event} with age ${eventAge} ms.`);
// Retry failed function executions
const failed = false;
if (failed) {
callback('some error');
} else {
callback();
}
};
Python
from datetime import datetime, timezone
# The 'python-dateutil' package must be included in requirements.txt.
from dateutil import parser
def avoid_infinite_retries(data, context):
"""Background Cloud Function that only executes within a certain
time period after the triggering event.
Args:
data (dict): The event payload.
context (google.cloud.functions.Context): The event metadata.
Returns:
None; output is written to Stackdriver Logging
"""
timestamp = context.timestamp
event_time = parser.parse(timestamp)
event_age = (datetime.now(timezone.utc) - event_time).total_seconds()
event_age_ms = event_age * 1000
# Ignore events that are too old
max_age_ms = 10000
if event_age_ms > max_age_ms:
print('Dropped {} (age {}ms)'.format(context.event_id, event_age_ms))
return 'Timeout'
# Do what the function is supposed to do
print('Processed {} (age {}ms)'.format(context.event_id, event_age_ms))
return # To retry the execution, raise an exception here
Go
// Package tips contains tips for writing Cloud Functions in Go.
package tips
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/functions/metadata"
)
// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Data []byte `json:"data"`
}
// FiniteRetryPubSub demonstrates how to avoid inifinite retries.
func FiniteRetryPubSub(ctx context.Context, m PubSubMessage) error {
meta, err := metadata.FromContext(ctx)
if err != nil {
// Assume an error on the function invoker and try again.
return fmt.Errorf("metadata.FromContext: %v", err)
}
// Ignore events that are too old.
expiration := meta.Timestamp.Add(10 * time.Second)
if time.Now().After(expiration) {
log.Printf("event timeout: halting retries for expired event '%q'", meta.EventID)
return nil
}
// Add your message processing logic.
return processTheMessage(m)
}
Java
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.gson.Gson;
import functions.eventpojos.PubsubMessage;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.logging.Logger;
public class RetryTimeout implements BackgroundFunction<PubsubMessage> {
private static final Logger logger = Logger.getLogger(RetryTimeout.class.getName());
private static final long MAX_EVENT_AGE = 10_000;
// Use Gson (https://github.com/google/gson) to parse JSON content.
private static final Gson gson = new Gson();
/**
* Background Cloud Function that only executes within
* a certain time period after the triggering event
*/
@Override
public void accept(PubsubMessage message, Context context) {
ZonedDateTime utcNow = ZonedDateTime.now(ZoneOffset.UTC);
ZonedDateTime timestamp = ZonedDateTime.parse(context.timestamp());
long eventAge = Duration.between(timestamp, utcNow).toMillis();
// Ignore events that are too old
if (eventAge > MAX_EVENT_AGE) {
logger.info(String.format("Dropping event with timestamp %s.", timestamp));
return;
}
// Process events that are recent enough
// To retry this invocation, throw an exception here
logger.info(String.format("Processing event with timestamp %s.", timestamp));
}
}
C#
using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace TimeBoundedRetries;
public class Function : ICloudEventFunction<MessagePublishedData>
{
private static readonly TimeSpan MaxEventAge = TimeSpan.FromSeconds(10);
private readonly ILogger _logger;
// Note: for additional testability, use an injectable clock abstraction.
public Function(ILogger<Function> logger) =>
_logger = logger;
public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
{
string textData = data.Message.TextData;
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
// Every PubSub CloudEvent will contain a timestamp.
DateTimeOffset timestamp = cloudEvent.Time.Value;
DateTimeOffset expiry = timestamp + MaxEventAge;
// Ignore events that are too old.
if (utcNow > expiry)
{
_logger.LogInformation("Dropping PubSub message '{text}'", textData);
return Task.CompletedTask;
}
// Process events that are recent enough.
// If this processing throws an exception, the message will be retried until either
// processing succeeds or the event becomes too old and is dropped by the code above.
_logger.LogInformation("Processing PubSub message '{text}'", textData);
return Task.CompletedTask;
}
}
Ruby
require "functions_framework"
FunctionsFramework.cloud_event "avoid_infinite_retries" do |event|
# Use the event timestamp to determine the event age.
event_age_secs = Time.now - event.time.to_time
event_age_ms = (event_age_secs * 1000).to_i
max_age_ms = 10_000
if event_age_ms > max_age_ms
# Ignore events that are too old.
logger.info "Dropped #{event.id} (age #{event_age_ms}ms)"
else
# Do what the function is supposed to do.
logger.info "Handling #{event.id} (age #{event_age_ms}ms)..."
failed = true
# Raise an exception to signal failure and trigger a retry.
raise "I failed!" if failed
end
end
PHP
/**
* This function shows an example method for avoiding infinite retries in
* Google Cloud Functions. By default, functions configured to automatically
* retry execution on failure will be retried indefinitely - causing an
* infinite loop. To avoid this, we stop retrying executions (by not throwing
* exceptions) for any events that are older than a predefined threshold.
*/
use Google\CloudFunctions\CloudEvent;
function avoidInfiniteRetries(CloudEvent $event): void
{
$log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');
$eventId = $event->getId();
// The maximum age of events to process.
$maxAge = 10; // 10 seconds
// The age of the event being processed.
$eventAge = time() - strtotime($event->getTime());
// Ignore events that are too old
if ($eventAge > $maxAge) {
fwrite($log, 'Dropping event ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);
return;
}
// Do what the function is supposed to do
fwrite($log, 'Processing event: ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);
// infinite_retries failed function executions
$failed = true;
if ($failed) {
throw new Exception('Event ' . $eventId . ' failed; retrying...');
}
}
区分可重试错误和致命错误
如果您的函数已启用重试功能,则任何未处理的错误都将触发重试。 请确保您的代码能够捕获所有不应导致重试的错误。
Node.js
/**
* Background Cloud Function that demonstrates
* how to toggle retries using a promise
*
* @param {object} event The Cloud Functions event.
* @param {object} event.data Data included with the event.
* @param {object} event.data.retry User-supplied parameter that tells the function whether to retry.
*/
exports.retryPromise = event => {
const tryAgain = !!event.data.retry;
if (tryAgain) {
throw new Error('Retrying...');
} else {
console.error('Not retrying...');
return Promise.resolve();
}
};
/**
* Background Cloud Function that demonstrates
* how to toggle retries using a callback
*
* @param {object} event The Cloud Functions event.
* @param {object} event.data Data included with the event.
* @param {object} event.data.retry User-supplied parameter that tells the function whether to retry.
* @param {function} callback The callback function.
*/
exports.retryCallback = (event, callback) => {
const tryAgain = !!event.data.retry;
const err = new Error('Error!');
if (tryAgain) {
console.error('Retrying:', err);
callback(err);
} else {
console.error('Not retrying:', err);
callback();
}
};
Python
from google.cloud import error_reporting
error_client = error_reporting.Client()
def retry_or_not(data, context):
"""Background Cloud Function that demonstrates how to toggle retries.
Args:
data (dict): The event payload.
context (google.cloud.functions.Context): The event metadata.
Returns:
None; output is written to Stackdriver Logging
"""
# Retry based on a user-defined parameter
try_again = data.data.get('retry') is not None
try:
raise RuntimeError('I failed you')
except RuntimeError:
error_client.report_exception()
if try_again:
raise # Raise the exception and try again
else:
pass # Swallow the exception and don't retry
Go
// Package tips contains tips for writing Cloud Functions in Go.
package tips
import (
"context"
"errors"
"log"
)
// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Data []byte `json:"data"`
}
// RetryPubSub demonstrates how to toggle using retries.
func RetryPubSub(ctx context.Context, m PubSubMessage) error {
name := string(m.Data)
if name == "" {
name = "World"
}
// A misconfigured client will stay broken until the function is redeployed.
client, err := MisconfiguredDataClient()
if err != nil {
log.Printf("MisconfiguredDataClient (retry denied): %v", err)
// A nil return indicates that the function does not need a retry.
return nil
}
// Runtime error might be resolved with a new attempt.
if err = FailedWriteOperation(client, name); err != nil {
log.Printf("FailedWriteOperation (retry expected): %v", err)
// A non-nil return indicates that a retry is needed.
return err
}
return nil
}
Java
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import functions.eventpojos.PubsubMessage;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Logger;
public class RetryPubSub implements BackgroundFunction<PubsubMessage> {
private static final Logger logger = Logger.getLogger(RetryPubSub.class.getName());
// Use Gson (https://github.com/google/gson) to parse JSON content.
private static final Gson gson = new Gson();
@Override
public void accept(PubsubMessage message, Context context) {
String bodyJson = new String(
Base64.getDecoder().decode(message.getData()), StandardCharsets.UTF_8);
JsonElement bodyElement = gson.fromJson(bodyJson, JsonElement.class);
// Get the value of the "retry" JSON parameter, if one exists
boolean retry = false;
if (bodyElement != null && bodyElement.isJsonObject()) {
JsonObject body = bodyElement.getAsJsonObject();
if (body.has("retry") && body.get("retry").getAsBoolean()) {
retry = true;
}
}
// Retry if appropriate
if (retry) {
// Throwing an exception causes the execution to be retried
throw new RuntimeException("Retrying...");
} else {
logger.info("Not retrying...");
}
}
}
C#
using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Retry;
public class Function : ICloudEventFunction<MessagePublishedData>
{
private readonly ILogger _logger;
public Function(ILogger<Function> logger) =>
_logger = logger;
public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
{
bool retry = false;
string text = data.Message?.TextData;
// Get the value of the "retry" JSON parameter, if one exists.
if (!string.IsNullOrEmpty(text))
{
JsonElement element = JsonSerializer.Deserialize<JsonElement>(data.Message.TextData);
retry = element.TryGetProperty("retry", out var property) &&
property.ValueKind == JsonValueKind.True;
}
// Throwing an exception causes the execution to be retried.
if (retry)
{
throw new InvalidOperationException("Retrying...");
}
else
{
_logger.LogInformation("Not retrying...");
}
return Task.CompletedTask;
}
}
Ruby
require "functions_framework"
FunctionsFramework.cloud_event "retry_or_not" do |event|
try_again = event.data["retry"]
begin
# Simulate a failure
raise "I failed!"
rescue RuntimeError => e
logger.warn "Caught an error: #{e}"
if try_again
# Raise an exception to return a 500 and trigger a retry.
logger.info "Trying again..."
raise ex
else
# Return normally to end processing of this event.
logger.info "Giving up."
end
end
end
PHP
use Google\CloudFunctions\CloudEvent;
function tipsRetry(CloudEvent $event): void
{
$cloudEventData = $event->getData();
$pubSubData = $cloudEventData['message']['data'];
$json = json_decode(base64_decode($pubSubData), true);
// Determine whether to retry the invocation based on a parameter
$tryAgain = $json['some_parameter'];
if ($tryAgain) {
/**
* Functions with automatic retries enabled should throw exceptions to
* indicate intermittent failures that a retry might fix. In this
* case, a thrown exception will cause the original function
* invocation to be re-sent.
*/
throw new Exception('Intermittent failure occurred; retrying...');
}
/**
* If a function with retries enabled encounters a non-retriable
* failure, it should return *without* throwing an exception.
*/
$log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');
fwrite($log, 'Not retrying' . PHP_EOL);
}
使可重试的事件驱动型函数具有幂等性
可重试的事件驱动型函数必须是幂等函数。下面是一些有关如何使此类函数具有幂等性的一般指导原则:
- 许多外部 API(如 Stripe)允许提供幂等键作为参数。如果您在使用此类 API,应将事件 ID 作为幂等键。
- 幂等性与“至少一次”机制非常契合,因为它能确保重试的安全性。通常情况下,幂等性对于重试来说是不可或缺的。
- 确保代码具有内在的幂等性。例如:
- 确保即使发生多次变更 (mutation),执行结果也不会改变。
- 在事务中,先查询数据库状态再更改状态。
- 确保所有副作用本身也具有幂等性。
- 在函数之外强制执行事务检查(不依赖代码)。 例如,在某个位置留存状态信息,并记录已处理事件的 ID。
- 处理重复的带外函数调用。例如,设置一个单独的清理程序,在发生重复函数调用后执行清理。
后续步骤
- 部署 Cloud Functions 函数。
- 调用 Cloud Pub/Sub 触发器函数。
- 调用 Cloud Storage 触发器函数。
- 有关使用 Cloud Pub/Sub 来触发 Cloud Functions 函数的教程。
- 教程:搭配使用 Cloud Functions 和 Cloud Storage。