Thementyp ändern

Sie können ein Importthema in ein Standardthema umwandeln oder umgekehrt.

Importiertes Thema in ein Standardthema konvertieren

Wenn Sie ein Importthema in ein Standardthema umwandeln möchten, löschen Sie die Datenaufnahmeeinstellungen. Führen Sie diese Schritte aus:

Console

  1. Rufen Sie in der Google Cloud -Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das importierte Thema.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Entfernen Sie das Häkchen bei Aufnahme aktivieren.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics update aus:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Ersetzen Sie TOPIC_ID durch die Themen-ID.

Standardthemen in Amazon Kinesis Data Streams-Importthemen konvertieren

Wenn Sie ein Standard-Topic in ein Amazon Kinesis Data Streams-Import-Topic konvertieren möchten, prüfen Sie zuerst, ob Sie alle Voraussetzungen erfüllen.

Console

  1. Rufen Sie in der Google Cloud -Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema, das Sie in ein Importthema umwandeln möchten.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Wählen Sie die Option Aufnahme aktivieren aus.

  5. Wählen Sie als Datenaufnahmequelle Amazon Kinesis Data Streams aus.

  6. Geben Sie die folgenden Informationen ein:

    • ARN des Kinesis-Streams: Der ARN des Kinesis-Datenstreams, den Sie in Pub/Sub aufnehmen möchten. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN des Kinesis-Nutzers: Der ARN der Nutzerressource, die für den AWS Kinesis-Datenstream registriert ist. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN der AWS-Rolle: Der ARN der AWS-Rolle. Die ARN der Rolle hat folgendes Format: arn:aws:iam:${Account}:role/${RoleName}.

    • Dienstkonto: Das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellt haben.

  7. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • KINESIS_STREAM_ARN ist die ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Die ARN hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Verbraucherressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist die ARN der AWS-Rolle. Die ARN der Rolle hat folgendes Format: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellt haben.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Schnellstart-Anleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Weitere Informationen zu ARNs finden Sie unter Amazon-Ressourcennamen (ARNs) und IAM-IDs.

Standardthemen in Cloud Storage-Importthemen konvertieren

Wenn Sie ein Standardthema in ein Cloud Storage-Importthema konvertieren möchten, prüfen Sie zuerst, ob Sie alle Voraussetzungen erfüllen.

Console

  1. Rufen Sie in der Google Cloud -Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema, das Sie in ein Cloud Storage-Importthema umwandeln möchten.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Wählen Sie die Option Aufnahme aktivieren aus.

  5. Wählen Sie als Datenaufnahmequelle Google Cloud Storage aus.

  6. Klicken Sie für den Cloud Storage-Bucket auf Durchsuchen.

    Die Seite Bucket auswählen wird geöffnet. Wählen Sie eine der folgenden Optionen aus:

    • Wählen Sie einen vorhandenen Bucket aus einem geeigneten Projekt aus.

    • Klicken Sie auf das Symbol „Erstellen“ und folgen Sie der Anleitung auf dem Bildschirm, um einen neuen Bucket zu erstellen. Wählen Sie nach dem Erstellen des Buckets den Bucket für das Cloud Storage-Importthema aus.

  7. Wenn Sie den Bucket angeben, prüft Pub/Sub, ob das Pub/Sub-Dienstkonto die entsprechenden Berechtigungen für den Bucket hat. Bei Berechtigungsproblemen wird eine entsprechende Fehlermeldung angezeigt.

    Wenn Berechtigungsprobleme auftreten, klicken Sie auf Berechtigungen festlegen. Weitere Informationen finden Sie unter Cloud Storage-Berechtigungen für das Pub/Sub-Dienstkonto gewähren.

  8. Wählen Sie unter Objektformat die Option Text, Avro oder Pub/Sub Avro aus.

    Wenn Sie Text auswählen, können Sie optional ein Trennzeichen angeben, mit dem Objekte in Nachrichten aufgeteilt werden.

    Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

  9. Optional. Sie können für Ihr Thema eine Mindesterstellungszeit der Objekte angeben. Wenn diese Option festgelegt ist, werden nur Objekte aufgenommen, die nach der Mindesterstellungszeit der Objekte erstellt wurden.

    Weitere Informationen finden Sie unter Mindesterstellungszeit der Objekte.

  10. Sie müssen ein Globusmuster angeben. Wenn Sie alle Objekte in den Bucket aufnehmen möchten, verwenden Sie ** als Glob-Muster. Es werden nur Objekte aufgenommen, die dem angegebenen Muster entsprechen.

    Weitere Informationen finden Sie unter Glob-Muster abgleichen.

  11. Behalten Sie die anderen Standardeinstellungen bei.
  12. Klicken Sie auf Thema aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Damit die Einstellungen für das importierte Thema nicht verloren gehen, sollten Sie sie jedes Mal angeben, wenn Sie das Thema aktualisieren. Wenn du etwas auslässt, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

    Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf keine Projekt-ID enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies muss ein einzelnes Zeichen sein und darf nur festgelegt werden, wenn INPUT_FORMAT text ist. Standardmäßig ist das Zeilenumbruchzeichen (\n) festgelegt.

      Achten Sie bei der Verwendung der gcloud CLI zum Angeben des Trennzeichens genau auf die Behandlung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escape-Zeichen verwenden, wird das Trennzeichen "n" verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss im UTC-Format YYYY-MM-DDThh:mm:ssZ vorliegen. Beispiel: 2024-10-14T08:30:30Z

      Gültig sind alle Datumsangaben zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z, sowohl in der Vergangenheit als auch in der Zukunft.

    • MATCH_GLOB: Gibt das Glob-Muster an, das mit einem Objekt übereinstimmen muss, damit es aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss das Zeichen * in einem Matchglob mit *-Zeichen in Form von \*\*.txt formatiert sein oder der gesamte Matchglob muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.