Asociar un esquema a un tema

En este documento se explica cómo asociar esquemas a temas de Pub/Sub.

Antes de empezar

Roles y permisos necesarios

Para obtener los permisos que necesitas para asociar y gestionar esquemas, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.editor) en tu proyecto. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para asociar y gestionar esquemas. Para ver los permisos exactos que se necesitan, despliega la sección Permisos necesarios:

Permisos obligatorios

Para asociar y gestionar esquemas, se necesitan los siguientes permisos:

  • Crea un esquema: pubsub.schemas.create
  • Adjuntar un esquema a un tema: pubsub.schemas.attach
  • Confirmar una revisión del esquema: pubsub.schemas.commit
  • Eliminar un esquema o una revisión de un esquema: pubsub.schemas.delete
  • Obtener un esquema o revisiones de un esquema: pubsub.schemas.get
  • Ver esquemas: pubsub.schemas.list
  • Lista de revisiones del esquema: pubsub.schemas.listRevisions
  • Restaurar una versión anterior de un esquema: pubsub.schemas.rollback
  • Validar un mensaje: pubsub.schemas.validate
  • Obtener la política de gestión de identidades y accesos de un esquema: pubsub.schemas.getIamPolicy
  • Configura la política de gestión de identidades y accesos de un esquema: pubsub.schemas.setIamPolicy

También puedes obtener estos permisos con roles personalizados u otros roles predefinidos.

Puedes conceder roles y permisos a entidades principales, como usuarios, grupos, dominios o cuentas de servicio. Puedes crear un esquema en un proyecto y adjuntarlo a un tema ubicado en otro proyecto. Asegúrate de que tienes los permisos necesarios para cada proyecto.

Directrices para asociar un esquema a un tema

Puedes asociar un esquema a un tema cuando crees o edites un tema. Estas son las directrices para asociar un esquema a un tema:

  • Puede asociar un esquema a uno o varios temas.

    Una vez que se asocia un esquema a un tema, todos los mensajes que reciba el tema de los editores deben seguir ese esquema.

  • Cuando asocias un esquema a un tema, también debes especificar la codificación de los mensajes que se van a publicar como BINARY o JSON. Si usas JSON con un esquema Avro, presta especial atención a las reglas de codificación de las uniones.

  • Si un esquema asociado a un tema tiene revisiones, los mensajes deben coincidir con la codificación y validarse con una revisión dentro del intervalo disponible. Si no se validan, el mensaje no se publicará.

    Las revisiones se prueban en orden cronológico inverso según la hora de creación. Para crear una revisión de un esquema, consulta Confirmar una revisión de un esquema.

Lógica de validación de un esquema de mensaje

Cuando asocias un esquema a un tema y el esquema tiene revisiones, puedes especificar un intervalo de revisiones para usarlo. Si no especificas un intervalo, se usará todo el intervalo para la validación.

Si no especificas una revisión como Primera revisión permitida, se usará la revisión más antigua del esquema para la validación. Si no especificas una revisión como Última revisión permitida, se usará la revisión más reciente del esquema.

Tomemos como ejemplo el esquema S, que está asociado al tema T.

El esquema S tiene los IDs de revisión A, B, C y D creados en orden, donde A es la primera o la revisión más antigua. Ninguno de los esquemas es idéntico a otro ni es una reversión de un esquema ya creado.

  • Si solo define el campo Primera revisión permitida como B, los mensajes que solo se ajusten al esquema A se rechazarán, mientras que los mensajes que se ajusten a los esquemas B, C y D se aceptarán.

  • Si solo asigna el valor C al campo Última revisión permitida, se aceptarán los mensajes que cumplan los esquemas A, B y C, y se rechazarán los mensajes que solo cumplan el esquema D.

  • Si asignas el valor B al campo First revision allowed y el valor C al campo Last revision allowed, se aceptarán los mensajes que se ajusten a los esquemas B y C.

  • También puedes definir la primera y la última revisión con el mismo ID de revisión. En este caso, solo se aceptarán los mensajes que se ajusten a esa revisión.

Crear y asociar un esquema al crear un tema

Puedes crear un tema con un esquema mediante la Google Cloud consola, la CLI de gcloud, la API de Pub/Sub o las bibliotecas de cliente de Cloud.

Consola

  1. En la Google Cloud consola, ve a la página Temas de Pub/Sub.

    Ir a Temas

  2. Haz clic en Crear tema.

  3. En el campo ID de tema, introduce un ID para el tema.

    Para poner nombre a un tema, consulta las directrices.

  4. Marca la casilla Usar un esquema.

    Mantenga la configuración predeterminada en los campos restantes.

    Puedes crear un esquema o usar uno que ya tengas.

  5. Si va a crear un esquema, siga estos pasos: `

    1. En Seleccionar un esquema de Pub/Sub, selecciona Crear un esquema.

    La página Crear esquema se muestra en una pestaña secundaria.

    Sigue los pasos que se indican en Crear un esquema.

    1. Vuelve a la pestaña Crear tema y haz clic en Actualizar.

    2. Busque su esquema en el campo Seleccionar un esquema de Pub/Sub.

    3. Selecciona la codificación de mensajes JSON o Binario.

    El esquema que acabas de crear tiene un ID de revisión. Puedes crear revisiones de esquema adicionales, tal como se explica en Confirmar una revisión de esquema.

  6. Si va a asociar un esquema que ya ha creado, siga estos pasos:

    1. En Seleccionar un esquema de Pub/Sub, elija un esquema.

    2. Selecciona la codificación de mensajes JSON o Binario.

  7. Opcional: Si el esquema seleccionado tiene revisiones, en Intervalo de revisiones, usa los menús desplegables Primera revisión permitida y Última revisión permitida.

Puedes especificar ambos campos, solo uno o conservar la configuración predeterminada según tus necesidades.

  1. Mantenga la configuración predeterminada en los campos restantes.

  2. Haz clic en Crear para guardar el tema y asignarlo al esquema seleccionado.

gcloud

Para crear un tema asignado a un esquema creado anteriormente, ejecuta el comando gcloud pubsub topics create

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

Donde:

  • TOPIC_ID es el ID del tema que estás creando.
  • ENCODING_TYPE es la codificación de los mensajes validados con el esquema. Este valor debe ser JSON o BINARY.
  • SCHEMA_ID es el ID de un esquema ya creado.
  • FIRST_REVISION_ID es el ID de la revisión más antigua con la que se va a validar.
  • LAST_REVISION_ID es el ID de la revisión más reciente con la que se va a validar.

Tanto --first-revision-id como --last-revision-id son opcionales.

También puedes asignar un esquema de otro proyecto de Google Cloud :

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

Donde:

  • SCHEMA_PROJECT es el ID del proyecto del Google Cloud proyecto del esquema.
  • TOPIC_PROJECT es el ID del proyecto del Google Cloud proyecto del tema.

REST

Para crear un tema, usa el método projects.topics.create:

Solicitud:

La solicitud debe autenticarse con un token de acceso en el encabezado Authorization. Para obtener un token de acceso de las credenciales de aplicación predeterminadas actuales, usa gcloud auth application-default print-access-token.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

Cuerpo de la solicitud:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

Donde:

  • PROJECT_ID es el ID del proyecto.
  • El ID de tu tema es TOPIC_ID.
  • SCHEMA_NAME es el nombre del esquema con el que se deben validar los mensajes publicados. El formato es: projects/PROJECT_ID/schemas/SCHEMA_ID.
  • ENCODING_TYPE es la codificación de los mensajes validados con el esquema. Debe ser JSON o BINARY.
  • FIRST_REVISION_ID es el ID de la revisión más antigua con la que se va a validar.
  • LAST_REVISION_ID es el ID de la revisión más reciente con la que se va a validar.

Tanto firstRevisionId como lastRevisionId son opcionales.

Respuesta:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

Se omiten firstRevisionId y lastRevisionId si no se proporcionan en la solicitud.

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  google::pubsub::v1::Topic request;
  request.set_name(pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.mutable_schema_settings()->set_schema(
      pubsub::Schema(std::move(project_id), std::move(schema_id)).FullName());
  request.mutable_schema_settings()->set_encoding(
      encoding == "JSON" ? google::pubsub::v1::JSON
                         : google::pubsub::v1::BINARY);
  auto topic = client.CreateTopic(request);

  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaId, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            TopicName = topicName,
            SchemaSettings = new SchemaSettings
            {
                SchemaAsSchemaName = SchemaName.FromProjectSchema(projectId, schemaId),
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	topic := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		SchemaSettings: &pubsubpb.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionId: firstRevisionID,
			LastRevisionId:  lastRevisionID,
			// Alternative encoding is pubsubpb.Encoding_JSON
			Encoding: pubsubpb.Encoding_BINARY,
		},
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %w", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType,
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId: string,
  schemaNameOrId: string,
  encodingType: 'BINARY' | 'JSON',
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :BINARY

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id),
                                 schema_settings: {
                                   schema: pubsub.schema_path(schema_id),
                                   encoding: message_encoding
                                 }

puts "Topic #{topic.name} created."

Editar un esquema asociado a un tema

Puedes editar un tema para adjuntar un esquema, eliminarlo o actualizar el intervalo de revisión que se usa para validar mensajes. Por lo general, si tienes previsto hacer cambios en el esquema en uso, puedes confirmar una nueva revisión y actualizar el intervalo de revisiones que se usa en el tema.

Puedes editar un esquema asociado a un tema mediante laGoogle Cloud consola, la CLI de gcloud, la API de Pub/Sub o las bibliotecas de cliente de Cloud.

Consola

  1. En la Google Cloud consola, ve a la página Temas de Pub/Sub.

    Ir a Temas

  2. Haz clic en el ID de tema de un tema.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Puede hacer los siguientes cambios en el esquema.

    Los cambios pueden tardar unos minutos en aplicarse.

    • Si quieres quitar el esquema del tema, en la página Editar tema, desmarca la casilla Usar un esquema.

    • Si quieres cambiar el esquema, en la sección Esquema, selecciona el nombre de un esquema.

    Actualiza los demás campos según sea necesario.

    • Si quieres actualizar el intervalo de revisiones, en Intervalo de revisiones, usa los menús desplegables Primera revisión permitida y Última revisión permitida.

    Puedes especificar ambos campos, solo uno o conservar la configuración predeterminada según tus necesidades.

  5. Haz clic en Actualizar para guardar los cambios.

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

Donde:

  • TOPIC_ID es el ID del tema que estás creando.
  • ENCODING_TYPE es la codificación de los mensajes validados con el esquema. Este valor debe ser JSON o BINARY.
  • SCHEMA_NAME es el nombre de un esquema.
  • FIRST_REVISION_ID es el ID de la revisión más antigua con la que se va a validar.
  • LAST_REVISION_ID es el ID de la revisión más reciente con la que se va a validar.

Tanto --first-revision-id como --last-revision-id son opcionales.

REST

Para actualizar un tema, usa el método projects.topics.patch:

Solicitud:

La solicitud debe autenticarse con un token de acceso en el encabezado Authorization. Para obtener un token de acceso de las credenciales de aplicación predeterminadas actuales, usa gcloud auth application-default print-access-token.

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

Cuerpo de la solicitud:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

Donde:

  • PROJECT_ID es el ID del proyecto.
  • El ID de tu tema es TOPIC_ID.
  • SCHEMA_NAME es el nombre del esquema con el que se deben validar los mensajes publicados. El formato es: projects/PROJECT_ID/schemas/SCHEMA_ID.
  • ENCODING_TYPE es la codificación de los mensajes validados con el esquema. Debe ser JSON o BINARY.
  • FIRST_REVISION_ID es el ID de la revisión más antigua con la que se va a validar.
  • LAST_REVISION_ID es el ID de la revisión más reciente con la que se va a validar.

Tanto firstRevisionId como lastRevisionId son opcionales.

Respuesta:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

Los valores de firstRevisionId y lastRevisionId no se definen después de la actualización.

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& first_revision_id,
   std::string const& last_revision_id) {
  google::pubsub::v1::UpdateTopicRequest request;
  auto* request_topic = request.mutable_topic();
  request_topic->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  request_topic->mutable_schema_settings()->set_first_revision_id(
      first_revision_id);
  request_topic->mutable_schema_settings()->set_last_revision_id(
      last_revision_id);
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.first_revision_id";
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.last_revision_id";
  auto topic = client.UpdateTopic(request);

  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic" // an existing topic that has schema settings attached to it.
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings
	// with the same field mask.
	req := &pubsubpb.UpdateTopicRequest{
		Topic: &pubsubpb.Topic{
			Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
			SchemaSettings: &pubsubpb.SchemaSettings{
				FirstRevisionId: firstRevisionID,
				LastRevisionId:  lastRevisionID,
			},
		},
		// Construct a field mask to indicate which field to update in the topic.
		// Fields are specified relative to the topic
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"schema_settings.first_revision_id", "schema_settings.last_revision_id"},
		},
	}
	gotTopicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, req)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")

Siguientes pasos