Google.Cloud.PubSub.V1

Google.Cloud.PubSub.V1 is a.NET client library for the Cloud Pub/Sub API.

Note: This documentation is for version 3.1.0 of the library. Some samples may not work with other versions.

Installation

Install the Google.Cloud.PubSub.V1 package from NuGet. Add it to your project in the normal way (for example by right-clicking on the project in Visual Studio and choosing "Manage NuGet Packages...").

Authentication

When running on Google Cloud Platform, no action needs to be taken to authenticate.

Otherwise, the simplest way of authenticating your API calls is to download a service account JSON file then set the GOOGLE_APPLICATION_CREDENTIALS environment variable to refer to it. The credentials will automatically be used to authenticate. See the Getting Started With Authentication guide for more details.

Getting started

PublisherServiceApiClient and SubscriberServiceApiClient provide a general-purpose abstraction over raw the RPC API, providing features such as page streaming to make client code cleaner and simpler.

PublisherClient and SubscriberClient provide simpler APIs for message publishing and subscribing. These classes offer considerably higher performance and simplicity, especially when working with higher message throughput.

Note that both PublisherClient and SubscriberClient expect to execute in an environment with continuous processing and continuous network access to the Pub/Sub API. In environments such as Cloud Run or Cloud Functions, where servers do not use any CPU between requests, the PublisherServiceApiClient and SubscriberServiceApiClient classes should be used instead.

Sample code

Using PublisherClient and SubscriberClient for message publishing and subscribing:

// First create a topic.
PublisherServiceApiClient publisherService = await PublisherServiceApiClient.CreateAsync();
TopicName topicName = new TopicName(projectId, topicId);
publisherService
.CreateTopic(topicName);

// Subscribe to the topic.
SubscriberServiceApiClient subscriberService = await SubscriberServiceApiClient.CreateAsync();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriberService
.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);

// Publish a message to the topic using PublisherClient.
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
// PublishAsync() has various overloads. Here we're using the string overload.
string messageId = await publisher.PublishAsync("Hello, Pubsub");
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
await publisher
.ShutdownAsync(TimeSpan.FromSeconds(15));

// Pull messages from the subscription using SubscriberClient.
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
List<PubsubMessage> receivedMessages = new List<PubsubMessage>();
// Start the subscriber listening for messages.
await subscriber
.StartAsync((msg, cancellationToken) =>
{
    receivedMessages
.Add(msg);
   
Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
   
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
   
// Stop this subscriber after one message is received.
   
// This is non-blocking, and the returned Task may be awaited.
    subscriber
.StopAsync(TimeSpan.FromSeconds(15));
   
// Return Reply.Ack to indicate this message has been handled.
   
return Task.FromResult(SubscriberClient.Reply.Ack);
});

// Tidy up by deleting the subscription and the topic.
subscriberService
.DeleteSubscription(subscriptionName);
publisherService
.DeleteTopic(topicName);

Using PublisherServiceApiClient and SubscriberServiceApiClient only:

// First create a topic.
PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
TopicName topicName = new TopicName(projectId, topicId);
publisher
.CreateTopic(topicName);

// Subscribe to the topic.
SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriber
.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);

// Publish a message to the topic.
PubsubMessage message = new PubsubMessage
{
   
// The data is any arbitrary ByteString. Here, we're using text.
   
Data = ByteString.CopyFromUtf8("Hello, Pubsub"),
   
// The attributes provide metadata in a string-to-string dictionary.
   
Attributes =
   
{
       
{ "description", "Simple text message" }
   
}
};
publisher
.Publish(topicName, new[] { message });

// Pull messages from the subscription. This will wait for some time if no new messages have been
// published yet.
PullResponse response = subscriber.Pull(subscriptionName, maxMessages: 10);
foreach (ReceivedMessage received in response.ReceivedMessages)
{
   
PubsubMessage msg = received.Message;
   
Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
   
Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
}

// Acknowledge that we've received the messages. If we don't do this within 60 seconds (as specified
// when we created the subscription) we'll receive the messages again when we next pull.
subscriber
.Acknowledge(subscriptionName, response.ReceivedMessages.Select(m => m.AckId));

// Tidy up by deleting the subscription and the topic.
subscriber
.DeleteSubscription(subscriptionName);
publisher
.DeleteTopic(topicName);

Performance considerations and default settings

PublisherClient and SubscriberClient are optimized for high-throughput high-performance scenarios, and default settings have been chosen with this in mind; however, note that these classes are also well suited to use cases where performance is not a major consideration.

By default multiple gRPC channels are created on client startup, with the channel count defaulting to the CPU processor count as returned by Environment.ProcessorCount. This is to allow greater bandwidth than a single gRPC channel can support; the processor count is a pragmatic choice to approximately scale maximum throughput performance by potential machine workload.

When using multiple clients on a machine with a high processor count, this may cause problems with TCP connection exhaustion. Set the relevant builder ClientCount property to a low value (1 is suitable for low or moderate throughput requirements) to mitigate this.

Coding considerations

PublisherClient and SubscriberClient are expensive to create, so when regularly publishing or subscribing to the same topic or subscription then a singleton client instance should be created and used for the lifetime of the application.

Both synchronous Create(...) and asynchronous CreateAsync(...) methods are provided, but note that when using default credentials on Google Compute Engine (GCE) then a network request may need to be made to retrieve credentials from the GCE Metadata Server.

The overloads for Create and CreateAsync accepting just a topic or subscription name use default settings for everything else, and are the most convenient approach for creating clients when the defaults are acceptable. For further customization (e.g. to set different credentials, or a different client count) we recommend using PublisherClientBuilder and SubscriberClientBuilder for consistency with other APIs, and for maximum flexibility. There are overloads of Create and CreateAsync accepting publisher/subscriber-specific ClientCreationSettings, but these are legacy methods from versions where the builders did not exist. They are likely to be removed in future major versions.

When publishing, the Task returned by the various Publish(...) methods will complete only when the message has been sent to the PubSub server, so should generally not be awaited directly otherwise performance will suffer. This returned Task may be ignored if the publisher does not need to be know whether the message was successfully published or not. The Task completes successfully when the message has been published to the server; or faults if there was an error publishing.

When subscribing, an instance of SubscriberClient can only have StartAsync(...) called on it once. Once StopAsync(...) has been called to shutdown the client, then a new client must be created to restart listening for messages with StartAsync(...) again. Due to the expense of creating a client instance, it is recommended that a singleton client per topic is used for the lifetime of the application.

Using the emulator

To connect to a Pub/Sub Emulator, set the EmulatorDetection property in the appropriate class depending on which client type you are constructing:

  • PublisherClientBuilder (for PublisherClient)
  • SubscriberClientBuilder (for SubscriberClient)
  • PublisherServiceApiClientBuilder (for PublisherServiceApiClient)
  • SubscriberServiceApiClientBuilder (for SubscriberServiceApiClient)

SubscriberClient example:

SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = new SubscriberClientBuilder
{
   
SubscriptionName = subscriptionName,
   
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();
// Use subscriber.StartAsync etc as normal

SubscriberServiceApiClientBuilder example:

SubscriberServiceApiClient subscriber = new SubscriberServiceApiClientBuilder
{
   
EmulatorDetection = EmulatorDetection.EmulatorOrProduction
}.Build();

See the help article for more details about emulator support in the .NET Google Cloud client libraries.