A Topic object allows you to interact with a Cloud Pub/Sub topic.
Package
@google-cloud/pubsub
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
To enable message ordering, set enableMessageOrdering
to true. Please note that this does not persist to an actual topic.
const topic = pubsub.topic('ordered-topic', {enableMessageOrdering: true});
Constructors
(constructor)(pubsub, name, options)
constructor(pubsub: PubSub, name: string, options?: PublishOptions);
Constructs a new instance of the Topic
class
Parameters
Properties
getSubscriptionsStream
getSubscriptionsStream: () => ObjectStream<Subscription>;
Property Value
iam
Property Value
metadata?: TopicMetadata;
Property Value
name
Property Value
parent
Property Value
publisher
Property Value
pubsub
Property Value
request
request: typeof PubSub.prototype.request;
Property Value
Type | Description |
typeof PubSub#request | |
Methods
create(gaxOpts)
create(gaxOpts?: CallOptions): Promise<CreateTopicResponse>;
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.create((err, topic, apiResponse) => {
if (!err) {
// The topic was created successfully.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.create().then((data) => {
const topic = data[0];
const apiResponse = data[1];
});
create(callback)
create(callback: CreateTopicCallback): void;
Parameter
Returns
create(gaxOpts, callback)
create(gaxOpts: CallOptions, callback: CreateTopicCallback): void;
Parameters
Returns
createSubscription(name, callback)
createSubscription(name: string, callback: CreateSubscriptionCallback): void;
Create a subscription to this topic.
Parameters
Returns
Type | Description |
void | {Promise
|
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const callback = function(err, subscription, apiResponse) {};
// Without specifying any options.
topic.createSubscription('newMessages', callback);
// With options.
topic.createSubscription('newMessages', {
ackDeadlineSeconds: 90
}, callback);
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.createSubscription('newMessages').then((data) => {
const subscription = data[0];
const apiResponse = data[1];
});
createSubscription(name, options)
createSubscription(name: string, options?: CreateSubscriptionOptions): Promise<CreateSubscriptionResponse>;
Parameters
Returns
createSubscription(name, options, callback)
createSubscription(name: string, options: CreateSubscriptionOptions, callback: CreateSubscriptionCallback): void;
Parameters
Returns
delete(callback)
delete(callback: EmptyCallback): void;
Delete the topic. This will not delete subscriptions to this topic.
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.delete((err, apiResponse) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.delete().then((data) => {
const apiResponse = data[0];
});
delete(gaxOpts)
delete(gaxOpts?: CallOptions): Promise<EmptyResponse>;
Parameter
Name | Description |
gaxOpts |
CallOptions
|
Returns
delete(gaxOpts, callback)
delete(gaxOpts: CallOptions, callback: EmptyCallback): void;
Parameters
Returns
exists()
exists(): Promise<ExistsResponse>;
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.exists((err, exists) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.exists().then((data) => {
const exists = data[0];
});
exists(callback)
exists(callback: ExistsCallback): void;
Parameter
Returns
flowControlled()
flowControlled(): FlowControlledPublisher;
Creates a FlowControlledPublisher for this Topic.
FlowControlledPublisher is a helper that lets you control how many messages are simultaneously queued to send, to avoid ballooning memory usage on a low bandwidth connection to Pub/Sub.
Note that it's perfectly fine to create more than one on the same Topic. The actual flow control settings on the Topic will apply across all FlowControlledPublisher objects on that Topic.
Returns
Type | Description |
FlowControlledPublisher | {FlowControlledPublisher} The flow control helper.
|
flush()
Immediately sends all remaining queued data. This is mostly useful if you are planning to call close() on the PubSub object that holds the server connections.
Returns
Type | Description |
Promise<void> | {Promise
|
flush(callback)
flush(callback: EmptyCallback): void;
Parameter
Returns
static formatName_(projectId: string, name: string): string;
Format the name of a topic. A Topic's full name is in the format of 'projects/{projectId}/topics/{topicName}'.
{string}
Parameters
Name | Description |
projectId |
string
|
name |
string
|
Returns
get(callback)
get(callback: GetTopicCallback): void;
Get a topic if it exists.
Parameter
Returns
Type | Description |
void | {Promise
|
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.get((err, topic, apiResponse) => {
// The `topic` data has been populated.
});
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.get().then((data) => {
const topic = data[0];
const apiResponse = data[1];
});
get(gaxOpts)
get(gaxOpts?: GetTopicOptions): Promise<GetTopicResponse>;
Parameter
Returns
get(gaxOpts, callback)
get(gaxOpts: GetTopicOptions, callback: GetTopicCallback): void;
Parameters
Returns
getMetadata(callback: GetTopicMetadataCallback): void;
Get the official representation of this topic from the API.
Parameter
Returns
Type | Description |
void | {Promise
|
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.getMetadata((err, apiResponse) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.getMetadata().then((data) => {
const apiResponse = data[0];
});
getMetadata(gaxOpts: CallOptions, callback: GetTopicMetadataCallback): void;
Parameters
Returns
getMetadata(gaxOpts?: CallOptions): Promise<GetTopicMetadataResponse>;
Parameter
Name | Description |
gaxOpts |
CallOptions
|
Returns
getPublishOptionDefaults()
getPublishOptionDefaults(): PublishOptions;
Get the default publisher options. These may be modified and passed back into .
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const defaults = topic.getPublishOptionDefaults();
defaults.batching.maxMilliseconds = 10;
topic.setPublishOptions(defaults);
getSubscriptions(callback)
getSubscriptions(callback: GetTopicSubscriptionsCallback): void;
Get a list of the subscriptions registered to this topic. You may optionally provide a query object as the first argument to customize the response.
Your provided callback will be invoked with an error object if an API error occurred or an array of {module:pubsub/subscription} objects.
Parameter
Returns
Type | Description |
void | {Promise
|
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.getSubscriptions((err, subscriptions) => {
// subscriptions is an array of `Subscription` objects.
});
// Customize the query.
topic.getSubscriptions({
pageSize: 3
}, callback);
//-
// If the callback is omitted, we'll return a Promise.
//-
topic.getSubscriptions().then((data) => {
const subscriptions = data[0];
});
getSubscriptions(options, callback)
getSubscriptions(options: PageOptions, callback: GetTopicSubscriptionsCallback): void;
Parameters
Returns
getSubscriptions(options)
getSubscriptions(options?: PageOptions): Promise<GetTopicSubscriptionsResponse>;
Parameter
Returns
publish(data, attributes)
publish(data: Buffer, attributes?: Attributes): Promise<string>;
Publish the provided message.
Parameters
Name | Description |
data |
Buffer
The message data. This must come in the form of a Buffer object.
|
attributes |
Attributes
Attributes for this message.
|
Returns
Type | Description |
Promise<string> | {Promise
|
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const data = Buffer.from('Hello, world!');
const callback = (err, messageId) => {
if (err) {
// Error handling omitted.
}
};
topic.publish(data, callback);
Optionally you can provide an object containing attributes for the message. Note that all values in the object must be strings.
const attributes = {
key: 'value'
};
topic.publish(data, attributes, callback);
If the callback is omitted, we'll return a Promise.
topic.publish(data).then((messageId) => {});
publish(data, callback)
publish(data: Buffer, callback: PublishCallback): void;
Parameters
Returns
publish(data, attributes, callback)
publish(data: Buffer, attributes: Attributes, callback: PublishCallback): void;
Parameters
Returns
publishJSON(json, attributes)
publishJSON(json: object, attributes?: Attributes): Promise<string>;
Publish the provided JSON. It should be noted that all messages published are done so in the form of a Buffer. This is simply a convenience method that will transform JSON into a Buffer before publishing. Subscription objects will always return message data in the form of a Buffer, so any JSON published will require manual deserialization.
Parameters
Name | Description |
json |
object
The JSON data to publish.
|
attributes |
Attributes
Attributes for this message.
|
Returns
Type | Description |
Promise<string> | {Promise
|
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const data = {
foo: 'bar'
};
const callback = (err, messageId) => {
if (err) {
// Error handling omitted.
}
};
topic.publishJSON(data, callback);
Optionally you can provide an object containing attributes for the message. Note that all values in the object must be strings.
const attributes = {
key: 'value'
};
topic.publishJSON(data, attributes, callback);
If the callback is omitted, we'll return a Promise.
topic.publishJSON(data).then((messageId) => {});
publishJSON(json, callback)
publishJSON(json: object, callback: PublishCallback): void;
Parameters
Returns
publishJSON(json, attributes, callback)
publishJSON(json: object, attributes: Attributes, callback: PublishCallback): void;
Parameters
Returns
publishMessage(message)
publishMessage(message: MessageOptions): Promise<[string]>;
Publish the provided message.
Parameter
Name | Description |
message |
MessageOptions
Message object.
|
Returns
Type | Description |
Promise<[string]> | {Promise
|
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const data = Buffer.from('Hello, world!');
const callback = (err, messageId) => {
if (err) {
// Error handling omitted.
}
};
topic.publishMessage({data}, callback);
Publish JSON message data.
const json = {foo: 'bar'};
topic.publishMessage({json}, callback);
To publish messages in order (this is still experimental), make sure message ordering is enabled and provide an ordering key
const topic = pubsub.topic('ordered-topic', {messageOrdering: true});
const orderingKey = 'my-key';
topic.publishMessage({data, orderingKey}, callback);
If the callback is omitted, we'll return a Promise.
const [messageId] = await topic.publishMessage({data});
publishMessage(message, callback)
publishMessage(message: MessageOptions, callback: PublishCallback): void;
Parameters
Returns
resumePublishing(orderingKey)
resumePublishing(orderingKey: string): void;
In the event that the client fails to publish an ordered message, all subsequent publish calls using the same ordering key will fail. Calling this method will disregard the publish failure, allowing the supplied ordering key to be used again in the future.
Parameter
Name | Description |
orderingKey |
string
The ordering key in question.
|
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic', {messageOrdering: true});
const orderingKey = 'foo';
const data = Buffer.from('Hello, order!');
topic.publishMessage({data, orderingKey}, err => {
if (err) {
topic.resumePublishing(orderingKey);
}
});
setMetadata(options: TopicMetadata, gaxOpts?: CallOptions): Promise<SetTopicMetadataResponse>;
Parameters
Returns
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const metadata = {
labels: {foo: 'bar'}
};
topic.setMetadata(metadata, err => {
if (err) {
// Error handling omitted.
}
});
If the callback is omitted, we'll return a Promise.
topic.setMetadata(metadata).then((data) => {
const apiResponse = data[0];
});
setMetadata(options: TopicMetadata, callback: SetTopicMetadataCallback): void;
Parameters
Returns
setMetadata(options: TopicMetadata, gaxOpts: CallOptions, callback: SetTopicMetadataCallback): void;
Parameters
Returns
setPublishOptions(options)
setPublishOptions(options: PublishOptions): void;
Set the publisher options.
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
topic.setPublishOptions({
batching: {
maxMilliseconds: 10
}
});
subscription(name, options)
subscription(name: string, options?: SubscriptionOptions): Subscription;
Create a Subscription object. This command by itself will not run any API requests. You will receive a {module:pubsub/subscription} object, which will allow you to interact with a subscription.
Parameters
Name | Description |
name |
string
Name of the subscription.
|
options |
SubscriptionOptions
Configuration object. {Subscription}
|
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
// Register a listener for `message` events.
subscription.on('message', (message) => {
// Called every time a message is received.
// message.id = ID of the message.
// message.ackId = ID used to acknowledge the message receival.
// message.data = Contents of the message.
// message.attributes = Attributes of the message.
// message.publishTime = Timestamp when Pub/Sub received the message.
});