パブリッシャー クライアントまたはパブリッシュ リクエストに、再試行遅延の設定または再試行タイムアウトの設定を適用します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
C++
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>> f) {
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
C#
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;
public class PublishMessageWithRetrySettingsAsyncSample
{
public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Retry settings control how the publisher handles retry-able failures
var maxAttempts = 3;
var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
var backoffMultiplier = 1.3; // default: 1.0
var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ApiSettings = new PublisherServiceApiSettings
{
PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: maxAttempts,
initialBackoff: initialBackoff,
maxBackoff: maxBackoff,
backoffMultiplier: backoffMultiplier,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(totalTimeout)
}
}.BuildAsync();
string message = await publisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
}
}
Go
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
)
func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
config := &pubsub.ClientConfig{
PublisherCallOptions: &vkit.PublisherCallOptions{
Publish: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.Canceled,
codes.Internal,
codes.ResourceExhausted,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 250 * time.Millisecond, // default 100 milliseconds
Max: 60 * time.Second, // default 60 seconds
Multiplier: 1.45, // default 1.3
})
}),
},
},
}
client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %w", err)
}
defer client.Close()
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %w", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithRetrySettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithRetrySettingsExample(projectId, topicId);
}
public static void publishWithRetrySettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(initialRetryDelay)
.setRetryDelayMultiplier(retryDelayMultiplier)
.setMaxRetryDelay(maxRetryDelay)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.setMaxRpcTimeout(maxRpcTimeout)
.setTotalTimeout(totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with retry settings: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
// optional auth parameters
});
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
const formattedTopic = publisherClient.projectTopicPath(
projectId,
topicNameOrId
);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
topic: formattedTopic,
messages: messages,
};
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 1.3,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 5000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 600000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
const [response] = await publisherClient.publish(request, {
retry: retrySettings,
});
console.log(`Message ${response.messageIds} published.`);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {v1} from '@google-cloud/pubsub';
// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
// optional auth parameters
});
async function publishWithRetrySettings(
projectId: string,
topicNameOrId: string,
data: string
) {
const formattedTopic = publisherClient.projectTopicPath(
projectId,
topicNameOrId
);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
topic: formattedTopic,
messages: messages,
};
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 1.3,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 5000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 600000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
const [response] = await publisherClient.publish(request, {
retry: retrySettings,
});
console.log(`Message ${response.messageIds} published.`);
}
PHP
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic with Retry Settings.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_with_retry_settings($projectId, $topicName, $message)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$retrySettings = [
'initialRetryDelayMillis' => 100,
'retryDelayMultiplier' => 5,
'maxRetryDelayMillis' => 60000,
'initialRpcTimeoutMillis' => 1000,
'rpcTimeoutMultiplier' => 1,
'maxRpcTimeoutMillis' => 600000,
'totalTimeoutMillis' => 600000
];
$topic->publish((new MessageBuilder)->setData($message)->build(), [
'retrySettings' => $retrySettings
]);
print('Message published with retry settings' . PHP_EOL);
}
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
from google import api_core
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())
print(f"Published messages with retry settings to {topic_path}.")
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。