Cambia tipo di argomento

Puoi convertire un argomento di importazione in uno standard o, al contrario, un argomento standard in uno di importazione.

Convertire un argomento di importazione in un argomento standard

Per convertire un argomento di importazione in un argomento standard, cancella le impostazioni di importazione. Svolgi i seguenti passaggi:

Console

  1. Nella console Google Cloud, vai alla pagina Argomenti.

    Vai ad Argomenti

  2. Fai clic sull'argomento di importazione.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Deseleziona l'opzione Abilita importazione.

  5. Fai clic su Update (Aggiorna).

gcloud

  1. Nella console Google Cloud, attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell che mostra un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installato e con valori già impostati per il progetto attuale. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Esegui il comando gcloud pubsub topics update:

    
    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings
    

    Sostituisci TOPIC_ID con l'ID dell'argomento.

Converti un argomento standard in un argomento di importazione

Per convertire un argomento standard in un argomento di importazione, verifica innanzitutto di soddisfare tutti i prerequisiti.

Console

  1. Nella console Google Cloud, vai alla pagina Argomenti.

    Vai ad Argomenti

  2. Fai clic sull'argomento da convertire in un argomento di importazione.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Seleziona l'opzione Abilita l'importazione.

  5. Per l'origine di importazione, seleziona Amazon Kinesis Data Streams.

  6. Inserisci i seguenti dettagli:

    • ARN del flusso Kinesis: l'ARN per lo stream di dati Kinesis che prevedi di importare in Pub/Sub. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN del consumer Kinesis: l'ARN della risorsa consumer registrato nel flusso di dati AWS Kinesis. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN del ruolo AWS: l'ARN del ruolo AWS. Il formato ARN del ruolo è il seguente: arn:aws:iam:${Account}:role/${RoleName}.

    • Account di servizio: l'account di servizio creato nella sezione Creare un account di servizio in Google Cloud.

  7. Fai clic su Update (Aggiorna).

gcloud

  1. Nella console Google Cloud, attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell che mostra un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installato e con valori già impostati per il progetto attuale. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Esegui il comando gcloud pubsub topics update con tutti i flag menzionati nell'esempio seguente:

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Sostituisci quanto segue:

    • TOPIC_ID è l'ID argomento. Questo campo non può essere aggiornato.

    • KINESIS_STREAM_ARN è l'ARN per i flussi di dati Kinesis che prevedi di importare in Pub/Sub. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN è l'ARN della risorsa consumer registrata nei flussi di dati di AWS Kinesis. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN è l'ARN del ruolo AWS. Il formato ARN del ruolo è il seguente: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT è l'account di servizio che hai creato nella sezione Creare un account di servizio in Google Cloud.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Go di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Java di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Node.js di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Python di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API C++ di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Per ulteriori informazioni sugli ARN, consulta la pagina relativa ai nomi delle risorse Amazon (ARN) e agli identificatori IAM.