Create a Cloud Storage subscription

Create a Cloud Storage subscription where messages published to a topic populates a Cloud Storage bucket

Code sample

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SubscriptionAdminClient client, std::string const& project_id,
   std::string const& topic_id, std::string const& subscription_id,
   std::string const& bucket) {
  auto sub = client.CreateSubscription(
      pubsub::Topic(project_id, topic_id),
      pubsub::Subscription(project_id, subscription_id),
      pubsub::SubscriptionBuilder{}.set_cloud_storage_config(
          pubsub::CloudStorageConfigBuilder{}.set_bucket(bucket)));
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createCloudStorageSubscription creates a Pub/Sub subscription that exports messages to Cloud Storage.
func createCloudStorageSubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, bucket string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// note bucket should not have the gs:// prefix
	// bucket := "my-bucket"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		CloudStorageConfig: pubsub.CloudStorageConfig{
			Bucket:         bucket,
			FilenamePrefix: "log_events_",
			FilenameSuffix: ".avro",
			OutputFormat:   &pubsub.CloudStorageOutputFormatAvroConfig{WriteMetadata: true},
			MaxDuration:    1 * time.Minute,
			MaxBytes:       1e8,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created Cloud Storage subscription: %v\n", sub)

	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.Duration;
import com.google.pubsub.v1.CloudStorageConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateCloudStorageSubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bucket = "your-bucket";
    String filenamePrefix = "log_events_";
    String filenameSuffix = ".text";
    Duration maxDuration = Duration.newBuilder().setSeconds(300).build();

    createCloudStorageSubscription(
        projectId, topicId, subscriptionId, bucket, filenamePrefix, filenameSuffix, maxDuration);
  }

  public static void createCloudStorageSubscription(
      String projectId,
      String topicId,
      String subscriptionId,
      String bucket,
      String filenamePrefix,
      String filenameSuffix,
      Duration maxDuration)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      CloudStorageConfig cloudStorageConfig =
          CloudStorageConfig.newBuilder()
              .setBucket(bucket)
              .setFilenamePrefix(filenamePrefix)
              .setFilenameSuffix(filenameSuffix)
              .setMaxDuration(maxDuration)
              .build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setCloudStorageConfig(cloudStorageConfig)
                  .build());

      System.out.println("Created a CloudStorage subscription: " + subscription.getAllFields());
    }
  }
}

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub GCS subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $bucket The Cloud Storage bucket name without any prefix like "gs://".
 */
function create_cloud_storage_subscription($projectId, $topicName, $subscriptionName, $bucket)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = ['bucket' => $bucket];
    $subscription->create([
        'cloudStorageConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

from google.cloud import pubsub_v1
from google.protobuf import duration_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bucket = "my-bucket"

filename_prefix = "log_events_"
filename_suffix = ".avro"
# Either CloudStorageConfig.AvroConfig or CloudStorageConfig.TextConfig
# defaults to TextConfig
avro_config = pubsub_v1.types.CloudStorageConfig.AvroConfig(write_metadata=True)

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
max_duration = duration_pb2.Duration()
max_duration.FromSeconds(300)

cloudstorage_config = pubsub_v1.types.CloudStorageConfig(
    bucket=bucket,
    filename_prefix=filename_prefix,
    filename_suffix=filename_suffix,
    avro_config=avro_config,
    # Min 1 minutes, max 10 minutes
    max_duration=max_duration,
    # Min 1 KB, max 10 GiB
    max_bytes=2000,
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "cloud_storage_config": cloudstorage_config,
        }
    )

print(f"CloudStorage subscription created: {subscription}.")
print(f"Bucket for subscription is: {bucket}")
print(f"Prefix is: {filename_prefix}")
print(f"Suffix is: {filename_suffix}")

What's next

To search and filter code samples for other Google Cloud products, see the Google Cloud sample browser.