舍弃超过 10 秒前发生的所有事件。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C#
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace TimeBoundedRetries;
public class Function : ICloudEventFunction<MessagePublishedData>
{
private static readonly TimeSpan MaxEventAge = TimeSpan.FromSeconds(10);
private readonly ILogger _logger;
// Note: for additional testability, use an injectable clock abstraction.
public Function(ILogger<Function> logger) =>
_logger = logger;
public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
{
string textData = data.Message.TextData;
DateTimeOffset utcNow = DateTimeOffset.UtcNow;
// Every PubSub CloudEvent will contain a timestamp.
DateTimeOffset timestamp = cloudEvent.Time.Value;
DateTimeOffset expiry = timestamp + MaxEventAge;
// Ignore events that are too old.
if (utcNow > expiry)
{
_logger.LogInformation("Dropping PubSub message '{text}'", textData);
return Task.CompletedTask;
}
// Process events that are recent enough.
// If this processing throws an exception, the message will be retried until either
// processing succeeds or the event becomes too old and is dropped by the code above.
_logger.LogInformation("Processing PubSub message '{text}'", textData);
return Task.CompletedTask;
}
}
Go
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
// Package tips contains tips for writing Cloud Functions in Go.
package tips
import (
"context"
"fmt"
"log"
"time"
"cloud.google.com/go/functions/metadata"
)
// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Data []byte `json:"data"`
}
// FiniteRetryPubSub demonstrates how to avoid inifinite retries.
func FiniteRetryPubSub(ctx context.Context, m PubSubMessage) error {
meta, err := metadata.FromContext(ctx)
if err != nil {
// Assume an error on the function invoker and try again.
return fmt.Errorf("metadata.FromContext: %w", err)
}
// Ignore events that are too old.
expiration := meta.Timestamp.Add(10 * time.Second)
if time.Now().After(expiration) {
log.Printf("event timeout: halting retries for expired event '%q'", meta.EventID)
return nil
}
// Add your message processing logic.
return processTheMessage(m)
}
Java
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.gson.Gson;
import functions.eventpojos.PubsubMessage;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.logging.Logger;
public class RetryTimeout implements BackgroundFunction<PubsubMessage> {
private static final Logger logger = Logger.getLogger(RetryTimeout.class.getName());
private static final long MAX_EVENT_AGE = 10_000;
// Use Gson (https://github.com/google/gson) to parse JSON content.
private static final Gson gson = new Gson();
/**
* Background Cloud Function that only executes within
* a certain time period after the triggering event
*/
@Override
public void accept(PubsubMessage message, Context context) {
ZonedDateTime utcNow = ZonedDateTime.now(ZoneOffset.UTC);
ZonedDateTime timestamp = ZonedDateTime.parse(context.timestamp());
long eventAge = Duration.between(timestamp, utcNow).toMillis();
// Ignore events that are too old
if (eventAge > MAX_EVENT_AGE) {
logger.info(String.format("Dropping event with timestamp %s.", timestamp));
return;
}
// Process events that are recent enough
// To retry this invocation, throw an exception here
logger.info(String.format("Processing event with timestamp %s.", timestamp));
}
}
Node.js
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
/**
* Background Cloud Function that only executes within
* a certain time period after the triggering event
*
* @param {object} event The Cloud Functions event.
* @param {function} callback The callback function.
*/
exports.avoidInfiniteRetries = (event, callback) => {
const eventAge = Date.now() - Date.parse(event.timestamp);
const eventMaxAge = 10000;
// Ignore events that are too old
if (eventAge > eventMaxAge) {
console.log(`Dropping event ${event} with age ${eventAge} ms.`);
callback();
return;
}
// Do what the function is supposed to do
console.log(`Processing event ${event} with age ${eventAge} ms.`);
// Retry failed function executions
const failed = false;
if (failed) {
callback('some error');
} else {
callback();
}
};
PHP
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
/**
* This function shows an example method for avoiding infinite retries in
* Google Cloud Functions. By default, functions configured to automatically
* retry execution on failure will be retried indefinitely - causing an
* infinite loop. To avoid this, we stop retrying executions (by not throwing
* exceptions) for any events that are older than a predefined threshold.
*/
use Google\CloudFunctions\CloudEvent;
function avoidInfiniteRetries(CloudEvent $event): void
{
$log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');
$eventId = $event->getId();
// The maximum age of events to process.
$maxAge = 10; // 10 seconds
// The age of the event being processed.
$eventAge = time() - strtotime($event->getTime());
// Ignore events that are too old
if ($eventAge > $maxAge) {
fwrite($log, 'Dropping event ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);
return;
}
// Do what the function is supposed to do
fwrite($log, 'Processing event: ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);
// infinite_retries failed function executions
$failed = true;
if ($failed) {
throw new Exception('Event ' . $eventId . ' failed; retrying...');
}
}
Python
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
from datetime import datetime, timezone
# The 'python-dateutil' package must be included in requirements.txt.
from dateutil import parser
def avoid_infinite_retries(data, context):
"""Background Cloud Function that only executes within a certain
time period after the triggering event.
Args:
data (dict): The event payload.
context (google.cloud.functions.Context): The event metadata.
Returns:
None; output is written to Stackdriver Logging
"""
timestamp = context.timestamp
event_time = parser.parse(timestamp)
event_age = (datetime.now(timezone.utc) - event_time).total_seconds()
event_age_ms = event_age * 1000
# Ignore events that are too old
max_age_ms = 10000
if event_age_ms > max_age_ms:
print(f"Dropped {context.event_id} (age {event_age_ms}ms)")
return "Timeout"
# Do what the function is supposed to do
print(f"Processed {context.event_id} (age {event_age_ms}ms)")
return # To retry the execution, raise an exception here
Ruby
如需向 Cloud Functions 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
require "functions_framework"
FunctionsFramework.cloud_event "avoid_infinite_retries" do |event|
# Use the event timestamp to determine the event age.
event_age_secs = Time.now - event.time.to_time
event_age_ms = (event_age_secs * 1000).to_i
max_age_ms = 10_000
if event_age_ms > max_age_ms
# Ignore events that are too old.
logger.info "Dropped #{event.id} (age #{event_age_ms}ms)"
else
# Do what the function is supposed to do.
logger.info "Handling #{event.id} (age #{event_age_ms}ms)..."
failed = true
# Raise an exception to signal failure and trigger a retry.
raise "I failed!" if failed
end
end
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。