Create a schema for a topic

This document shows you how to create schemas for Pub/Sub topics.

Before you begin

Before you create a schema, complete the following:

Required roles and permissions

To get the permissions that you need to create and manage schemas, ask your administrator to grant you the Pub/Sub Editor (roles/pubsub.editor) IAM role on your project. For more information about granting roles, see Manage access.

This predefined role contains the permissions required to create and manage schemas. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create and manage schemas:

  • Create schema: pubsub.schemas.create
  • Attach schema to topic: pubsub.schemas.attach
  • Commit a schema revision: pubsub.schemas.commit
  • Delete a schema or a schema revision: pubsub.schemas.delete
  • Get a schema or schema revisions: pubsub.schemas.get
  • List schemas: pubsub.schemas.list
  • List schema revisions: pubsub.schemas.listRevisions
  • Rollback a schema: pubsub.schemas.rollback
  • Validate a message: pubsub.schemas.validate
  • Get the IAM policy for a schema: pubsub.schemas.getIamPolicy
  • Configure the IAM policy for a schema: pubsub.schemas.setIamPolicy

You might also be able to get these permissions with custom roles or other predefined roles.

You can grant roles and permissions to principals such as users, groups, domains, or service accounts. You can create a schema in one project and attach it to a topic located in a different project. Ensure that you have the required permissions for each project.

Create a schema

You can create a schema using the Google Cloud console, the gcloud CLI, the Pub/Sub API, or the Cloud Client Libraries. The following are some guidelines to create a schema:

  • For a schema to work with Pub/Sub, you must define only one top-level type. Import statements that reference other types are not supported.

  • You can associate the same schema to multiple topics.

  • You can manually test whether a message validates against a schema.

Console

To create a schema, follow these steps:

  1. In the Google Cloud console, go to the Pub/Sub schemas page.

    Go to Schemas

  2. Click Create schema.

  3. In the Schema ID field, enter an ID for your schema.

    For guidelines to name a schema, see Guidelines to name a topic, subscription, or snapshot.

  4. For Schema type, select either Avro or Protocol Buffer.

    Learn more about schema types.

  5. In the Schema definition field, enter the Avro or Protocol Buffer definition for your schema.

    For example, here's a sample schema in Avro.

    {
      "type": "record",
      "name": "Avro",
      "fields": [
        {
          "name": "ProductName",
          "type": "string",
          "default": ""
        },
        {
          "name": "SKU",
          "type": "int",
          "default": 0
        },
        {
          "name": "InStock",
          "type": "boolean",
          "default": false
        }
      ]
    }
    
    
  6. Optional: Click Validate definition to check if the schema definition is correct.

    The validation check does not check the compatibility of the schema with the messages to be published. Test messages in the next step.

  7. Optional: You can test if messages with the correct schema get published.

    1. Click Test message.

    2. In the Test message window, select a type of Message encoding.

    3. In the Message body, enter a test message.

    4. Click Test.

      For example, here's a sample message for the test schema. In this example, select the Message encoding as JSON.

      {"ProductName":"GreenOnions", "SKU":34543, "InStock":true}
      
    5. Exit the test message page.

  8. Click Create to save the schema.

gcloud

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Where:

  • SCHEMA_TYPE is either avro or protocol-buffer.
  • SCHEMA_DEFINITION is a string containing the definition of the schema, formatted according to the chosen schema type.

You can also specify the schema definition in a file:

gcloud pubsub schemas create SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

Where:

  • SCHEMA_TYPE is either avro or protocol-buffer.
  • SCHEMA_DEFINITION_FILE is a string containing the path to the file with the definition of the schema, formatted according to the chosen schema type.

REST

To create a schema, send a POST request like the following:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

Specify the following fields in the request body:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
}

Where:

  • SCHEMA_TYPE is either avro or protocol-buffer.
  • SCHEMA_DEFINITION is a string containing the definition of the schema, formatted according to the chosen schema type.

The response body should contain a JSON representation of a schema resource. For example:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

Where:

  • REVISION_ID is the server-generated ID for the revision.
  • REVISION_CREATE_TIME is the ISO 8601 timestamp at which the revision was created.

C++

Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CreateSchemaRequest request;
  request.set_parent(google::cloud::Project(project_id).FullName());
  request.set_schema_id(schema_id);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CreateSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();
  std::cout << "Schema successfully created: " << schema->DebugString()
            << "\n";
}

C#

Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.

Avro


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateAvroSchemaSample
{
    public Schema CreateAvroSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.Avro,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Proto


using Google.Api.Gax.ResourceNames;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.IO;

public class CreateProtoSchemaSample
{
    public Schema CreateProtoSchema(string projectId, string schemaId, string pathToDefinition)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();
        var schemaName = SchemaName.FromProjectSchema(projectId, schemaId);
        string schemaDefinition = File.ReadAllText(pathToDefinition);
        Schema schema = new Schema
        {
            SchemaName = schemaName,
            Type = Schema.Types.Type.ProtocolBuffer,
            Definition = schemaDefinition
        };
        CreateSchemaRequest createSchemaRequest = new CreateSchemaRequest
        {
            ParentAsProjectName = ProjectName.FromProject(projectId),
            SchemaId = schemaId,
            Schema = schema
        };

        try
        {
            schema = schemaService.CreateSchema(createSchemaRequest);
            Console.WriteLine($"Schema {schema.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Schema {schemaName} already exists.");
        }
        return schema;
    }
}

Go

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createAvroSchema creates a schema resource from a JSON-formatted Avro schema file.
func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	"cloud.google.com/go/pubsub"
)

// createProtoSchema creates a schema resource from a schema proto file.
func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CreateSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CreateSchema: %w", err)
	}
	fmt.Fprintf(w, "Schema created: %#v\n", s)
	return nil
}

Java

Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.

Avro


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    createAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema createAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CreateProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    createProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema createProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.createSchema(
              projectName,
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build(),
              schemaId);

      System.out.println("Created a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
      return null;
    }
  }
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId, avscFile) {
  const definition = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId, protoFile) {
  const definition = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

Node.js

Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createAvroSchema(schemaNameOrId: string, avscFile: string) {
  const definition: string = fs.readFileSync(avscFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.Avro,
    definition
  );

  const name = await schema.getName();
  console.log(`Schema ${name} created.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createProtoSchema(schemaNameOrId: string, protoFile: string) {
  const definition: string = fs.readFileSync(protoFile).toString();
  const schema = await pubSubClient.createSchema(
    schemaNameOrId,
    SchemaTypes.ProtocolBuffer,
    definition
  );

  const fullName: string = await schema.getName();
  console.log(`Schema ${fullName} created.`);
}

PHP

Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub PHP API reference documentation.

Avro

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an AVRO definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $avscFile
 */
function create_avro_schema(string $projectId, string $schemaId, string $avscFile): void
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($avscFile);
    $schema = $pubsub->createSchema($schemaId, 'AVRO', $definition);

    printf('Schema %s created.', $schema->name());
}

Proto

use Google\Cloud\PubSub\PubSubClient;

/**
 * Create a Schema with an Protocol Buffer definition.
 *
 * @param string $projectId
 * @param string $schemaId
 * @param string $protoFile
 */
function create_proto_schema($projectId, $schemaId, $protoFile)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $definition = (string) file_get_contents($protoFile);
    $schema = $pubsub->createSchema($schemaId, 'PROTOCOL_BUFFER', $definition);

    printf('Schema %s created.', $schema->name());
}

Python

Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.

Avro

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

project_path = f"projects/{project_id}"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Proto

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

project_path = f"projects/{project_id}"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using a protobuf schema file:\n{result}")
    return result
except AlreadyExists:
    print(f"{schema_id} already exists.")

Ruby

Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.

Avro

# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

pubsub = Google::Cloud::Pubsub.new

definition = File.read avsc_file
schema = pubsub.create_schema schema_id, :avro, definition

puts "Schema #{schema.name} created."

Proto

# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

pubsub = Google::Cloud::Pubsub.new

definition = File.read proto_file
schema = pubsub.create_schema schema_id, :protocol_buffer, definition

puts "Schema #{schema.name} created."

After a schema is created, you can see the details of the schema in the Schemas page.

You can associate the schema with a topic.

What's next