Class Subscription (3.7.0)

A Subscription object will give you access to your Cloud Pub/Sub subscription.

Subscriptions are sometimes retrieved when using various methods:

Subscription objects may be created directly with:

All Subscription objects are instances of an [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull for messages automatically as long as there is at least one listener assigned for the message event. Available events:

Upon receipt of a message: on(event: 'message', listener: (message: Message) => void): this;

Upon receipt of an error: on(event: 'error', listener: (error: Error) => void): this;

Upon receipt of a (non-fatal) debug warning: on(event: 'debug', listener: (msg: DebugMessage) => void): this;

Upon the closing of the subscriber: on(event: 'close', listener: Function): this;

By default Subscription objects allow you to process 100 messages at the same time. You can fine tune this value by adjusting the options.flowControl.maxMessages option.

If your subscription is seeing more re-deliveries than preferable, you might try increasing your options.ackDeadline value or decreasing the options.streamingOptions.maxStreams value.

Subscription objects handle ack management, by automatically extending the ack deadline while the message is being processed, to then issue the ack or nack of such message when the processing is done. **Note:** message redelivery is still possible.

By default each PubSub instance can handle 100 open streams, with default options this translates to less than 20 Subscriptions per PubSub instance. If you wish to create more Subscriptions than that, you can either create multiple PubSub instances or lower the options.streamingOptions.maxStreams value on each Subscription object.

Inheritance

EventEmitter > Subscription

Package

@google-cloud/pubsub

Examples

From


const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

pubsub.getSubscriptions((err, subscriptions) => {
  // `subscriptions` is an array of Subscription objects.
});

From


const topic = pubsub.topic('my-topic');
topic.getSubscriptions((err, subscriptions) => {
  // `subscriptions` is an array of Subscription objects.
});


const topic = pubsub.topic('my-topic');
topic.createSubscription('new-subscription', (err, subscription) => {
  // `subscription` is a Subscription object.
});


const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
// `subscription` is a Subscription object.

Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.


// Register an error handler.
subscription.on('error', (err) => {});

// Register a debug handler, to catch non-fatal errors and other messages.
subscription.on('debug', msg => { console.log(msg.message); });

// Register a close handler in case the subscriber closes unexpectedly
subscription.on('close', () => {});

// Register a listener for `message` events.
function onMessage(message) {
  // Called every time a message is received.

  // message.id = ID of the message.
  // message.ackId = ID used to acknowledge the message receival.
  // message.data = Contents of the message.
  // message.attributes = Attributes of the message.
  // message.publishTime = Date when Pub/Sub received the message.

  // Ack the message:
  // message.ack();

  // This doesn't ack the message, but allows more messages to be retrieved
  // if your limit was hit or if you don't want to ack the message.
  // message.nack();
}
subscription.on('message', onMessage);

// Remove the listener from receiving `message` events.
subscription.removeListener('message', onMessage);

To apply a fine level of flow control, consider the following configuration


const subscription = topic.subscription('my-sub', {
  flowControl: {
    maxMessages: 1,
    // this tells the client to manage and lock any excess messages
    allowExcessMessages: false
  }
});

Constructors

(constructor)(pubsub, name, options)

constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions);

Constructs a new instance of the Subscription class

Parameters
NameDescription
pubsub PubSub
name string
options SubscriptionOptions

Properties

iam

iam: IAM;

isOpen

get isOpen(): boolean;

Indicates if the Subscription is open and receiving messages.

{boolean}

metadata

metadata?: google.pubsub.v1.ISubscription;

name

name: string;

projectId

get projectId(): string;

{string}

pubsub

pubsub: PubSub;

request

request: typeof PubSub.prototype.request;

topic

topic?: Topic | string;

Methods

close()

close(): Promise<void>;

Closes the Subscription, once this is called you will no longer receive message events unless you call {Subscription#open} or add new message listeners.

Returns
TypeDescription
Promise<void>
Example

subscription.close(err => {
  if (err) {
    // Error handling omitted.
  }
});

// If the callback is omitted a Promise will be returned.
subscription.close().then(() => {});

close(callback)

close(callback: SubscriptionCloseCallback): void;
Parameter
NameDescription
callback SubscriptionCloseCallback
Returns
TypeDescription
void

create(options)

create(options?: CreateSubscriptionOptions): Promise<CreateSubscriptionResponse>;

Create a subscription.

Parameter
NameDescription
options CreateSubscriptionOptions

See a [Subscription resource](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions).

Returns
TypeDescription
Promise<CreateSubscriptionResponse>

{Promise

Examples

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('newMessages');
const callback = function(err, subscription, apiResponse) {};

subscription.create(callback);

With options


subscription.create({
  ackDeadlineSeconds: 90
}, callback);

If the callback is omitted, we'll return a Promise.


const [sub, apiResponse] = await subscription.create();

create(callback)

create(callback: CreateSubscriptionCallback): void;
Parameter
NameDescription
callback CreateSubscriptionCallback
Returns
TypeDescription
void

create(options, callback)

create(options: CreateSubscriptionOptions, callback: CreateSubscriptionCallback): void;
Parameters
NameDescription
options CreateSubscriptionOptions
callback CreateSubscriptionCallback
Returns
TypeDescription
void

createSnapshot(name, gaxOpts)

createSnapshot(name: string, gaxOpts?: CallOptions): Promise<CreateSnapshotResponse>;

Create a snapshot with the given name.

Parameters
NameDescription
name string

Name of the snapshot.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<CreateSnapshotResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

const callback = (err, snapshot, apiResponse) => {
  if (!err) {
    // The snapshot was created successfully.
  }
};

subscription.createSnapshot('my-snapshot', callback);

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.createSnapshot('my-snapshot').then((data) => {
  const snapshot = data[0];
  const apiResponse = data[1];
});

createSnapshot(name, callback)

createSnapshot(name: string, callback: CreateSnapshotCallback): void;
Parameters
NameDescription
name string
callback CreateSnapshotCallback
Returns
TypeDescription
void

createSnapshot(name, gaxOpts, callback)

createSnapshot(name: string, gaxOpts: CallOptions, callback: CreateSnapshotCallback): void;
Parameters
NameDescription
name string
gaxOpts CallOptions
callback CreateSnapshotCallback
Returns
TypeDescription
void

delete(gaxOpts)

delete(gaxOpts?: CallOptions): Promise<EmptyResponse>;

Delete the subscription. Pull requests from the current subscription will be errored once unsubscription is complete.

Parameter
NameDescription
gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<EmptyResponse>
Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.delete((err, apiResponse) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.delete().then((data) => {
  const apiResponse = data[0];
});

delete(callback)

delete(callback: EmptyCallback): void;
Parameter
NameDescription
callback EmptyCallback
Returns
TypeDescription
void

delete(gaxOpts, callback)

delete(gaxOpts: CallOptions, callback: EmptyCallback): void;
Parameters
NameDescription
gaxOpts CallOptions
callback EmptyCallback
Returns
TypeDescription
void

detached()

detached(): Promise<DetachedResponse>;

Check if a subscription is detached.

Returns
TypeDescription
Promise<DetachedResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.detached((err, exists) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.detached().then((data) => {
  const detached = data[0];
});

detached(callback)

detached(callback: DetachedCallback): void;
Parameter
NameDescription
callback DetachedCallback
Returns
TypeDescription
void

exists()

exists(): Promise<ExistsResponse>;

Check if a subscription exists.

Returns
TypeDescription
Promise<ExistsResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.exists((err, exists) => {});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.exists().then((data) => {
  const exists = data[0];
});

exists(callback)

exists(callback: ExistsCallback): void;
Parameter
NameDescription
callback ExistsCallback
Returns
TypeDescription
void

formatMetadata_(metadata)

static formatMetadata_(metadata: SubscriptionMetadata): google.pubsub.v1.ISubscription;
Parameter
NameDescription
metadata SubscriptionMetadata
Returns
TypeDescription
ISubscription

formatName_(projectId, name)

static formatName_(projectId: string, name: string): string;
Parameters
NameDescription
projectId string
name string
Returns
TypeDescription
string

get(gaxOpts)

get(gaxOpts?: GetSubscriptionOptions): Promise<GetSubscriptionResponse>;

Get a subscription if it exists.

Parameter
NameDescription
gaxOpts GetSubscriptionOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<GetSubscriptionResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.get((err, subscription, apiResponse) => {
  // The `subscription` data has been populated.
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.get().then((data) => {
  const subscription = data[0];
  const apiResponse = data[1];
});

get(callback)

get(callback: GetSubscriptionCallback): void;
Parameter
NameDescription
callback GetSubscriptionCallback
Returns
TypeDescription
void

get(gaxOpts, callback)

get(gaxOpts: GetSubscriptionOptions, callback: GetSubscriptionCallback): void;
Parameters
NameDescription
gaxOpts GetSubscriptionOptions
callback GetSubscriptionCallback
Returns
TypeDescription
void

getMetadata(gaxOpts)

getMetadata(gaxOpts?: CallOptions): Promise<GetSubscriptionMetadataResponse>;

Fetches the subscriptions metadata.

Parameter
NameDescription
gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<GetSubscriptionMetadataResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

subscription.getMetadata((err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.getMetadata().then((data) => {
  const apiResponse = data[0];
});

getMetadata(callback)

getMetadata(callback: GetSubscriptionMetadataCallback): void;
Parameter
NameDescription
callback GetSubscriptionMetadataCallback
Returns
TypeDescription
void

getMetadata(gaxOpts, callback)

getMetadata(gaxOpts: CallOptions, callback: GetSubscriptionMetadataCallback): void;
Parameters
NameDescription
gaxOpts CallOptions
callback GetSubscriptionMetadataCallback
Returns
TypeDescription
void

modifyPushConfig(config, gaxOpts)

modifyPushConfig(config: PushConfig, gaxOpts?: CallOptions): Promise<EmptyResponse>;

Modify the push config for the subscription.

Parameters
NameDescription
config PushConfig

The push config.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<EmptyResponse>

{Promise

Example

const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();

const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');

const pushConfig = {
  pushEndpoint: 'https://mydomain.com/push',
  attributes: {
    key: 'value'
  },
  oidcToken: {
    serviceAccountEmail: 'myproject@appspot.gserviceaccount.com',
    audience: 'myaudience'
  }
};

subscription.modifyPushConfig(pushConfig, (err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.modifyPushConfig(pushConfig).then((data) => {
  const apiResponse = data[0];
});

modifyPushConfig(config, callback)

modifyPushConfig(config: PushConfig, callback: EmptyCallback): void;
Parameters
NameDescription
config PushConfig
callback EmptyCallback
Returns
TypeDescription
void

modifyPushConfig(config, gaxOpts, callback)

modifyPushConfig(config: PushConfig, gaxOpts: CallOptions, callback: EmptyCallback): void;
Parameters
NameDescription
config PushConfig
gaxOpts CallOptions
callback EmptyCallback
Returns
TypeDescription
void

open()

open(): void;

Opens the Subscription to receive messages. In general this method shouldn't need to be called, unless you wish to receive messages after calling . Alternatively one could just assign a new message event listener which will also re-open the Subscription.

Returns
TypeDescription
void
Example

subscription.on('message', message => message.ack());

// Close the subscription.
subscription.close(err => {
  if (err) {
    // Error handling omitted.
  }

  The subscription has been closed and messages will no longer be received.
});

// Resume receiving messages.
subscription.open();

seek(snapshot, gaxOpts)

seek(snapshot: string | Date, gaxOpts?: CallOptions): Promise<SeekResponse>;

Seeks an existing subscription to a point in time or a given snapshot.

Parameters
NameDescription
snapshot string | Date

The point to seek to. This will accept the name of the snapshot or a Date object.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<SeekResponse>

{Promise

Example

const callback = (err, resp) => {
  if (!err) {
    // Seek was successful.
  }
};

subscription.seek('my-snapshot', callback);

//-
// Alternatively, to specify a certain point in time, you can provide a
Date
// object.
//-
const date = new Date('October 21 2015');

subscription.seek(date, callback);

seek(snapshot, callback)

seek(snapshot: string | Date, callback: SeekCallback): void;
Parameters
NameDescription
snapshot string | Date
callback SeekCallback
Returns
TypeDescription
void

seek(snapshot, gaxOpts, callback)

seek(snapshot: string | Date, gaxOpts: CallOptions, callback: SeekCallback): void;
Parameters
NameDescription
snapshot string | Date
gaxOpts CallOptions
callback SeekCallback
Returns
TypeDescription
void

setMetadata(metadata, gaxOpts)

setMetadata(metadata: SubscriptionMetadata, gaxOpts?: CallOptions): Promise<SetSubscriptionMetadataResponse>;

Update the subscription object.

Parameters
NameDescription
metadata SubscriptionMetadata

The subscription metadata.

gaxOpts CallOptions

Request configuration options, outlined here: https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html.

Returns
TypeDescription
Promise<SetSubscriptionMetadataResponse>

{Promise

Example

const metadata = {
  key: 'value'
};

subscription.setMetadata(metadata, (err, apiResponse) => {
  if (err) {
    // Error handling omitted.
  }
});

//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.setMetadata(metadata).then((data) => {
  const apiResponse = data[0];
});

setMetadata(metadata, callback)

setMetadata(metadata: SubscriptionMetadata, callback: SetSubscriptionMetadataCallback): void;
Parameters
NameDescription
metadata SubscriptionMetadata
callback SetSubscriptionMetadataCallback
Returns
TypeDescription
void

setMetadata(metadata, gaxOpts, callback)

setMetadata(metadata: SubscriptionMetadata, gaxOpts: CallOptions, callback: SetSubscriptionMetadataCallback): void;
Parameters
NameDescription
metadata SubscriptionMetadata
gaxOpts CallOptions
callback SetSubscriptionMetadataCallback
Returns
TypeDescription
void

setOptions(options)

setOptions(options: SubscriberOptions): void;

Sets the Subscription options.

Parameter
NameDescription
options SubscriberOptions

The options.

Returns
TypeDescription
void

snapshot(name)

snapshot(name: string): Snapshot;

Create a Snapshot object. See to create a snapshot.

Parameter
NameDescription
name string

The name of the snapshot.

Returns
TypeDescription
Snapshot

{Snapshot}

Example

const snapshot = subscription.snapshot('my-snapshot');