Create BigQuery subscriptions

This document describes how to create a BigQuery subscription. You can use the Google Cloud console, the Google Cloud CLI, the client library, or the Pub/Sub API to create a BigQuery subscription.

Before you begin

Before reading this document, ensure that you're familiar with the following:

In addition to your familiarity with Pub/Sub and BigQuery, ensure that you meet the following prerequisites before you create a BigQuery subscription:

  • A BigQuery table exists. Alternatively, you can create one when you create the BigQuery subscription as described in the later sections of this document.

  • Compatibility between the schema of the Pub/Sub topic and the BigQuery table. If you add a non-compatible BigQuery table, you get a compatibility-related error message. For more information, see Schema compatibility.

Required roles and permissions

The following is a list of guidelines regarding roles and permissions:

  • To create a subscription, you must configure access control at the project level.

  • You also need resource-level permissions if your subscriptions and topics are in different projects, as discussed later in this section.

  • To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table. For more information about how to grant these permissions, see the next section of this document.

  • You can configure a BigQuery subscription in a project to write to a BigQuery table in a different project.

To get the permissions that you need to create BigQuery subscriptions, ask your administrator to grant you the Pub/Sub Editor (roles/pubsub.editor) IAM role on the project. For more information about granting roles, see Manage access.

This predefined role contains the permissions required to create BigQuery subscriptions. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create BigQuery subscriptions:

  • Pull from a subscription: pubsub.subscriptions.consume
  • Create a subscription: pubsub.subscriptions.create
  • Delete a subscription: pubsub.subscriptions.delete
  • Get a subscription: pubsub.subscriptions.get
  • List a subscription: pubsub.subscriptions.list
  • Update a subscription: pubsub.subscriptions.update
  • Attach a subscription to a topic: pubsub.topics.attachSubscription
  • Get the IAM policy for a subscription: pubsub.subscriptions.getIamPolicy
  • Configure the IAM policy for a subscription: pubsub.subscriptions.setIamPolicy

You might also be able to get these permissions with custom roles or other predefined roles.

If you need to create BigQuery subscriptions in one project that are associated with a topic in another project, ask your topic administrator to also grant you the Pub/Sub Editor (roles/pubsub.editor) IAM role on the topic.

Assign BigQuery roles to the Pub/Sub service account

Some Google Cloud services have Google Cloud-managed service accounts that lets the services access your resources. These service accounts are known as service agents. Pub/Sub creates and maintains a service account for each project in the format service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com.

To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.

Grant the BigQuery Data Editor (roles/bigquery.dataEditor) role to the Pub/Sub service account.

  1. In the Google Cloud console, go to the IAM page.

    Go to IAM

  2. Click Grant access.

  3. In the Add Principals section, enter the name of your Pub/Sub service account. The format of the service account is service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com. For example, for a project with project-number=112233445566, the service account is of the format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. In the Assign Roles section, click Add another role.

  5. In the Select a role drop-down, enter BigQuery, and select the BigQuery Data Editor role.

  6. Click Save.

For more information about BigQuery IAM, see BigQuery roles and permissions.

BigQuery subscription properties

When you configure a BigQuery subscription, you can specify the following properties.

Common properties

Learn about the common subscription properties that you can set across all subscriptions.

Use topic schema

This option lets Pub/Sub use the schema of the Pub/Sub topic to which the subscription is attached. In addition, Pub/Sub writes the fields in messages to the corresponding columns in the BigQuery table.

When you use this option, remember to check the following additional requirements:

  • The fields in the topic schema and the BigQuery schema must have the same names and their types must be compatible with each other.

  • Any optional field in the topic schema must also be optional in the BigQuery schema.

  • Required fields in the topic schema don't need to be required in the BigQuery schema.

  • If there are BigQuery fields that are not present in the topic schema, these BigQuery fields must be in mode NULLABLE.

  • If the topic schema has additional fields that are not present in the BigQuery schema and these fields can be dropped, select the option Drop unknown fields.

  • You can select only one of the subscription properties, Use topic schema or Use table schema.

If you don't select the Use topic schema or Use table schema option, ensure that the BigQuery table has a column called data of type BYTES, STRING, or JSON. Pub/Sub writes the message to this BigQuery column.

You might not see changes to the Pub/Sub topics schema or BigQuery table schema take effect immediately with messages written to the BigQuery table. For example, if the Drop unknown fields option is enabled and a field is present in the Pub/Sub schema, but not the BigQuery schema, messages written to the BigQuery table might still not contain the field after adding it to the BigQuery schema. Eventually, the schemas synchronize and subsequent messages include the field.

When you use the Use topic schema option for your BigQuery subscription, you can also take advantage of BigQuery change data capture (CDC). CDC updates your BigQuery tables by processing and applying changes to existing rows.

To learn more about this feature, see Stream table updates with change data capture.

To learn how to use this feature with BigQuery subscriptions, see BigQuery change data capture.

Use table schema

This option lets Pub/Sub use the schema of the BigQuery table to write the fields of a JSON message to the corresponding columns. When you use this option, remember to check the following additional requirements:

  • Published messages must be in JSON format.

  • If the subscription's topic has a schema associated with it, then the message encoding property must be set to JSON.

  • If there are BigQuery fields that are not present in the messages, these BigQuery fields must be in mode NULLABLE.

  • If the messages have additional fields that are not present in the BigQuery schema and these fields can be dropped, select the option Drop unknown fields.

  • In the JSON message, DATE, DATETIME, TIME, and TIMESTAMP values must be integers that adhere to the supported representations.

  • In the JSON message, NUMERIC and BIGNUMERIC values must be bytes encoded using the BigDecimalByteStringEncoder.

  • You can select only one of the subscription properties, Use topic schema or Use table schema.

If you don't select the Use topic schema or Use table schema option, ensure that the BigQuery table has a column called data of type BYTES, STRING, or JSON. Pub/Sub writes the message to this BigQuery column.

You might not see changes to the BigQuery table schema take effect immediately with messages written to the BigQuery table. For example, if the Drop unknown fields option is enabled and a field is present in the messages, but not in the BigQuery schema, messages written to the BigQuery table might still not contain the field after adding it to the BigQuery schema. Eventually, the schema synchronizes and subsequent messages include the field.

When you use the Use table schema option for your BigQuery subscription, you can also take advantage of BigQuery change data capture (CDC). CDC updates your BigQuery tables by processing and applying changes to existing rows.

To learn more about this feature, see Stream table updates with change data capture.

To learn how to use this feature with BigQuery subscriptions, see BigQuery change data capture.

Drop unknown fields

This option is used with the Use topic schema or Use table schema option. This option lets Pub/Sub drop any field that is present in the topic schema or message but not in the BigQuery schema. Without Drop unknown fields set, messages with extra fields are not written to BigQuery and remain in the subscription backlog. The subscription ends up in an error state.

Write metadata

This option lets Pub/Sub write the metadata of each message to additional columns in the BigQuery table. Else, the metadata is not written to the BigQuery table.

If you select the Write metadata option, ensure that the BigQuery table has the fields described in the following table.

If you don't select the Write metadata option, then the destination BigQuery table only requires the data field unless use_topic_schema is true. If you select both the Write metadata and Use topic schema options, then the schema of the topic must not contain any fields with names that match those of the metadata parameters. This limitation includes camelcase versions of these snake case parameters.

Parameters
subscription_name

STRING

Name of a subscription.

message_id

STRING

ID of a message

publish_time

TIMESTAMP

The time of publishing a message.

data

BYTES, STRING, or JSON

The message body.

The data field is required for all destination BigQuery tables that don't select Use topic schema. If the field is of type JSON, then the message body must be valid JSON.

attributes

STRING or JSON

A JSON object containing all message attributes. It also contains additional fields that are part of the Pub/Sub message including the ordering key, if present.

Create a BigQuery subscription

The following samples demonstrate how to create a subscription with BigQuery delivery.

Console

  1. In the Google Cloud console, go to the Subscriptions page.

    Go to Subscriptions

  2. Click Create subscription.
  3. For the Subscription ID field, enter a name.

    For information on how to name a subscription, see Guidelines to name a topic or a subscription.

  4. Choose or create a topic from the drop-down menu. The subscription receives messages from the topic.
  5. Select Delivery type as Write to BigQuery.
  6. Select the project for the BigQuery table.
  7. Select an existing dataset or create a new one.

    For information on how to create a dataset, see Creating datasets.

  8. Select an existing table or create a new one.

    For information on how to create a table, see Creating tables.

  9. We strongly recommend that you enable Dead lettering to handle message failures.

    For more information, see Dead letter topic.

  10. Click Create.

You can also create a subscription from the Topics page. This shortcut is useful for associating topics with subscriptions.

  1. In the Google Cloud console, go to the Topics page.

    Go to Topics

  2. Click next to the topic for which you want to create a subscription.
  3. From the context menu, select Create subscription.
  4. Select Delivery type as Write to BigQuery.
  5. Select the project for the BigQuery table.
  6. Select an existing dataset or create a new one.

    For information on how to create a dataset, see Creating datasets.

  7. Select an existing table or create a new one.

    For information on how to create a dataset, see Creating tables.

  8. We strongly recommend that you enable Dead lettering to handle message failures.

    For more information, see Dead letter topic.

  9. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. To create a Pub/Sub subscription, use the gcloud pubsub subscriptions create command:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_ID \
        --bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID

    Replace the following:

    • SUBSCRIPTION_ID: Specifies the ID of the subscription.
    • TOPIC_ID: Specifies the ID of the topic. The topic requires a schema.
    • PROJECT_ID: Specifies the ID of the project.
    • DATASET_ID: Specifies the ID of an existing dataset. To create a dataset, see Create datasets.
    • TABLE_ID: Specifies the ID of an existing table. The table requires a data field if your topic doesn't have a schema. To create a table, see Create an empty table with a schema definition.

C++

Before trying this sample, follow the C++ setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C++ API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id, std::string const& table_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_bigquery_config()->set_table(table_id);
  auto sub = client.CreateSubscription(request);
  if (!sub) {
    if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
      std::cout << "The subscription already exists\n";
      return;
    }
    throw std::move(sub).status();
  }

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Before trying this sample, follow the C# setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub C# API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.


using Google.Cloud.PubSub.V1;

public class CreateBigQuerySubscriptionSample
{
    public Subscription CreateBigQuerySubscription(string projectId, string topicId, string subscriptionId, string bigqueryTableId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            BigqueryConfig = new BigQueryConfig
            {
                Table = bigqueryTableId
            }
        };
        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        return subscription;
    }
}

Go

Before trying this sample, follow the Go setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Go API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createBigQuerySubscription creates a Pub/Sub subscription that exports messages to BigQuery.
func createBigQuerySubscription(w io.Writer, projectID, subID string, topic *pubsub.Topic, table string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	// table := "my-project-id.dataset_id.table_id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic: topic,
		BigQueryConfig: pubsub.BigQueryConfig{
			Table:         table,
			WriteMetadata: true,
		},
	})
	if err != nil {
		return fmt.Errorf("client.CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created BigQuery subscription: %v\n", sub)

	return nil
}

Java

Before trying this sample, follow the Java setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Java API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String bigqueryTableId = "your-project.your-dataset.your-table";

    createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
  }

  public static void createBigQuerySubscription(
      String projectId, String topicId, String subscriptionId, String bigqueryTableId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      BigQueryConfig bigqueryConfig =
          BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  .setBigqueryConfig(bigqueryConfig)
                  .build());

      System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId,
  subscriptionNameOrId,
  bigqueryTableId
) {
  const options = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const bigqueryTableId = 'YOUR_TABLE_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createBigQuerySubscription(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  bigqueryTableId: string
) {
  const options: CreateSubscriptionOptions = {
    bigqueryConfig: {
      table: bigqueryTableId,
      writeMetadata: true,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);

  console.log(`Subscription ${subscriptionNameOrId} created.`);
}

PHP

Before trying this sample, follow the PHP setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub PHP API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\BigQueryConfig;

/**
 * Creates a Pub/Sub BigQuery subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $table      The BigQuery table to which to write.
 */
function create_bigquery_subscription($projectId, $topicName, $subscriptionName, $table)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $config = new BigQueryConfig(['table' => $table]);
    $subscription->create([
        'bigqueryConfig' => $config
    ]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
}

Python

Before trying this sample, follow the Python setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Python API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# bigquery_table_id = "your-project.your-dataset.your-table"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

Ruby

Before trying this sample, follow the Ruby setup instructions in the Pub/Sub quickstart using client libraries. For more information, see the Pub/Sub Ruby API reference documentation.

To authenticate to Pub/Sub, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

require "google/cloud/pubsub"

##
# Shows how to create a BigQuery subscription where messages published
# to a topic populates a BigQuery table.
#
# @param project_id [String]
# Your Google Cloud project (e.g. "my-project")
# @param topic_id [String]
# Your topic name (e.g. "my-secret")
# @param subscription_id [String]
# ID for new subscription to be created (e.g. "my-subscription")
# @param bigquery_table_id [String]
# ID of bigquery table (e.g "my-project:dataset-id.table-id")
#
def pubsub_create_bigquery_subscription project_id:, topic_id:, subscription_id:, bigquery_table_id:
  pubsub = Google::Cloud::Pubsub.new project_id: project_id
  topic = pubsub.topic topic_id
  subscription = topic.subscribe subscription_id,
                                 bigquery_config: {
                                   table: bigquery_table_id,
                                   write_metadata: true
                                 }
  puts "BigQuery subscription created: #{subscription_id}."
  puts "Table for subscription is: #{bigquery_table_id}"
end

What's next