Commit für Schemaversion durchführen

In diesem Dokument erfahren Sie, wie Sie einen Commit für eine Schemaversion für Pub/Sub-Themen durchführen.

Hinweise

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub-Bearbeiter (roles/pubsub.editor) für Ihr Projekt zu gewähren, um die Berechtigungen zu erhalten, die Sie benötigen, um einen Commit für eine Schemaversion durchzuführen und Schemas zu verwalten. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die erforderlich sind, um einen Commit für eine Schemaversion durchzuführen und Schemas zu verwalten. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um einen Commit für eine Schemaversion durchzuführen und Schemas zu verwalten:

  • Schema erstellen: pubsub.schemas.create
  • Schema an Thema anhängen: pubsub.schemas.attach
  • Commit einer Schemaversion durchführen: pubsub.schemas.commit
  • Schema oder Schemaversion löschen: pubsub.schemas.delete
  • Schema oder Schemaversionen abrufen: pubsub.schemas.get
  • Schemas auflisten: pubsub.schemas.list
  • Schemaversionen auflisten: pubsub.schemas.listRevisions
  • Rollback eines Schemas durchführen: pubsub.schemas.rollback
  • Nachricht validieren: pubsub.schemas.validate
  • IAM-Richtlinie für ein Schema abrufen: pubsub.schemas.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Schema: pubsub.schemas.setIamPolicy

Möglicherweise können Sie diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können Hauptkonten wie Nutzern, Gruppen, Domains oder Dienstkonten Rollen und Berechtigungen gewähren. Sie können ein Schema in einem Projekt erstellen und es an ein Thema in einem anderen Projekt anhängen. Prüfen Sie, ob Sie die erforderlichen Berechtigungen für jedes Projekt haben.

Schema überarbeiten

Sie können einen Commit für eine Schemaversion mit der Google Cloud Console, der gcloud CLI, der Pub/Sub API oder den Cloud-Clientbibliotheken durchführen.

Im Folgenden finden Sie einige Richtlinien für die Commit einer Schemaversion:

  • Sie können ein Schema innerhalb bestimmter Einschränkungen überarbeiten:

    • Bei Protokollzwischenspeicher-Schemas können Sie optionale Felder hinzufügen oder entfernen. Sie können keine anderen Felder hinzufügen oder löschen. Außerdem können Sie keine vorhandenen Felder bearbeiten.

    • Regeln zur Schemaauflösung finden Sie in der Avro-Dokumentation zu Avro-Schemas. Eine neue Version muss den Regeln so folgen, als wäre sie sowohl das Lese- als auch das Autorenschema.

    • Ein Schema kann maximal 20 Versionen gleichzeitig haben. Wenn Sie das Limit überschreiten, löschen Sie eine Schemaversion, bevor Sie eine neue erstellen.

  • Jeder Version ist eine eindeutige Überarbeitungs-ID zugeordnet. Die Versions-ID ist eine automatisch generierte achtstellige UUID.

  • Wenn Sie den Überarbeitungsbereich oder die Version eines Schemas aktualisieren, das für die Themenvalidierung verwendet wird, kann es einige Minuten dauern, bis die Änderungen wirksam werden.

Console

So erstellen Sie eine Schemaversion:

  1. Rufen Sie in der Google Cloud Console die Seite Pub/Sub-Schemas auf.

    Schemas aufrufen

  2. Klicken Sie auf die Schema-ID eines vorhandenen Schemas.

    Die Seite Schemadetails für das Schema wird geöffnet.

  3. Klicken Sie auf Überarbeitung erstellen.

    Die Seite Schemaversion erstellen wird geöffnet.

  4. Nehmen Sie die erforderlichen Änderungen vor.

    Für das Beispielschema in Avro, das Sie unter Schema erstellen erstellt haben, können Sie beispielsweise ein zusätzliches optionales Feld namens Price hinzufügen:

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         }
       ]
     }
    
  5. Klicken Sie auf Definition validieren, um zu prüfen, ob die Schemadefinition korrekt ist.

  6. Sie können auch die Nachrichten für das Schema validieren.

    1. Klicken Sie auf Nachricht testen, um eine Beispielnachricht zu testen.

    2. Wählen Sie im Fenster Nachricht testen den Typ Nachrichtencodierung aus.

    3. Geben Sie im Nachrichtentext eine Testnachricht ein.

      Hier ist ein Beispiel für eine Nachricht für das Testschema. Wählen Sie in diesem Beispiel die Nachrichtencodierung als JSON aus.

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. Klicken Sie auf Test.

  7. Klicken Sie auf Commit, um das Schema zu speichern.

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Wobei:

  • SCHEMA_TYPE ist entweder avro oder protocol-buffer.
  • SCHEMA_DEFINITION ist ein string, der die Definition des Schemas enthält, die gemäß dem ausgewählten Schematyp formatiert ist.

Sie können die Schemadefinition auch in der Datei angeben:

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

Wobei:

  • SCHEMA_TYPE ist entweder avro oder protocol-buffer.
  • SCHEMA_DEFINITION_FILE ist ein string-Wert, der den Pfad zur Datei mit der Definition des Schemas enthält, formatiert gemäß dem ausgewählten Schematyp.

REST

Senden Sie eine POST-Anfrage wie die folgende, um einen Commit für eine Schemaversion durchzuführen:

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

Geben Sie im Anfragetext die folgenden Felder an:

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

Wobei:

  • SCHEMA_TYPE ist entweder AVRO oder PROTOCOL_BUFFER.
  • SCHEMA_DEFINITION ist ein String, der die Definition des Schemas enthält, formatiert gemäß dem ausgewählten Schematyp.
  • SCHEMA_NAME ist der Name eines vorhandenen Schemas.

Der Antworttext sollte eine JSON-Darstellung einer Schemaressource enthalten. Beispiel:

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

Wobei:

  • REVISION_ID ist die vom Server generierte ID für die Version.
  • REVISION_CREATE_TIME ist der ISO 8601-Zeitstempel für die Erstellung der Überarbeitung.

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Avro

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitAvroSchema commits a new avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := ioutil.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaAvro,
		Definition: string(avscSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"

	"cloud.google.com/go/pubsub"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := ioutil.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	config := pubsub.SchemaConfig{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsub.SchemaProtocolBuffer,
		Definition: string(protoSource),
	}
	s, err := client.CommitSchema(ctx, schemaID, config)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId, avscFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId, protoFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId: string, avscFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId: string, protoFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Nachdem Sie einen Commit für eine Schemaversion durchgeführt haben, können Sie die Details der neuen Überarbeitung auf der Seite Schemas ansehen.

Nächste Schritte