避免 Cloud Functions 中的无限重试

此示例演示了如何通过仅在触发事件后的特定时间段内执行来避免 Cloud Functions 中的无限重试。

深入探索

如需查看包含此代码示例的详细文档,请参阅以下内容:

代码示例

Go

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证


// Package tips contains tips for writing Cloud Functions in Go.
package tips

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"github.com/cloudevents/sdk-go/v2/event"
)

func init() {
	functions.CloudEvent("FiniteRetryPubSub", FiniteRetryPubSub)
}

// MessagePublishedData contains the full Pub/Sub message
// See the documentation for more details:
// https://cloud.google.com/eventarc/docs/cloudevents#pubsub
type MessagePublishedData struct {
	Message PubSubMessage
}

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// FiniteRetryPubSub demonstrates how to avoid inifinite retries.
func FiniteRetryPubSub(ctx context.Context, e event.Event) error {
	var msg MessagePublishedData
	if err := e.DataAs(&msg); err != nil {
		return fmt.Errorf("event.DataAs: %w", err)
	}

	// Ignore events that are too old.
	expiration := e.Time().Add(10 * time.Second)
	if time.Now().After(expiration) {
		log.Printf("event timeout: halting retries for expired event '%q'", e.ID())
		return nil
	}

	// Add your message processing logic.
	return processTheMessage(msg)
}

Java

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.functions.CloudEventsFunction;
import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.logging.Logger;

public class RetryTimeout implements CloudEventsFunction {
  private static final Logger logger = Logger.getLogger(RetryTimeout.class.getName());
  private static final long MAX_EVENT_AGE = 10_000;

  /**
   * Cloud Event Function that only executes within
   * a certain time period after the triggering event
   */
  @Override
  public void accept(CloudEvent event) throws Exception {
    ZonedDateTime utcNow = ZonedDateTime.now(ZoneOffset.UTC);
    ZonedDateTime timestamp = event.getTime().atZoneSameInstant(ZoneOffset.UTC);

    long eventAge = Duration.between(timestamp, utcNow).toMillis();

    // Ignore events that are too old
    if (eventAge > MAX_EVENT_AGE) {
      logger.info(String.format("Dropping event with timestamp %s.", timestamp));
      return;
    }

    // Process events that are recent enough
    // To retry this invocation, throw an exception here
    logger.info(String.format("Processing event with timestamp %s.", timestamp));
  }
}

Node.js

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

const functions = require('@google-cloud/functions-framework');

/**
 * Cloud Event Function that only executes within
 * a certain time period after the triggering event
 *
 * @param {object} event The Cloud Functions event.
 * @param {function} callback The callback function.
 */
functions.cloudEvent('avoidInfiniteRetries', (event, callback) => {
  const eventAge = Date.now() - Date.parse(event.time);
  const eventMaxAge = 10000;

  // Ignore events that are too old
  if (eventAge > eventMaxAge) {
    console.log(`Dropping event ${event} with age ${eventAge} ms.`);
    callback();
    return;
  }

  // Do what the function is supposed to do
  console.log(`Processing event ${event} with age ${eventAge} ms.`);

  // Retry failed function executions
  const failed = false;
  if (failed) {
    callback('some error');
  } else {
    callback();
  }
});

Python

如需向 Cloud Run functions 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

from datetime import datetime, timezone

# The 'python-dateutil' package must be included in requirements.txt.
from dateutil import parser

import functions_framework


@functions_framework.cloud_event
def avoid_infinite_retries(cloud_event):
    """Cloud Event Function that only executes within a certain
    time period after the triggering event.

    Args:
        cloud_event: The cloud event associated with the current trigger
    Returns:
        None; output is written to Stackdriver Logging
    """
    timestamp = cloud_event["time"]

    event_time = parser.parse(timestamp)
    event_age = (datetime.now(timezone.utc) - event_time).total_seconds()
    event_age_ms = event_age * 1000

    # Ignore events that are too old
    max_age_ms = 10000
    if event_age_ms > max_age_ms:
        print("Dropped {} (age {}ms)".format(cloud_event["id"], event_age_ms))
        return "Timeout"

    # Do what the function is supposed to do
    print("Processed {} (age {}ms)".format(cloud_event["id"], event_age_ms))
    return  # To retry the execution, raise an exception here

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器