Modificare il tipo di argomento

Puoi convertire un argomento di importazione in uno standard o viceversa, oppure un argomento standard in uno di importazione.

Convertire un argomento di importazione in uno standard

Per convertire un argomento di importazione in uno standard, cancella le impostazioni di importazione. Procedi nel seguente modo:

Console

  1. Nella console Google Cloud , vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic sull'argomento di importazione.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Deseleziona l'opzione Attiva importazione.

  5. Fai clic su Aggiorna.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Sostituisci TOPIC_ID con l'ID argomento.

Convertire un argomento standard in un argomento di importazione di Amazon Kinesis Data Streams

Per convertire un argomento standard in un argomento di importazione di Amazon Kinesis Data Streams, controlla innanzitutto di soddisfare tutti i prerequisiti.

Console

  1. Nella console Google Cloud , vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic sull'argomento che vuoi convertire in un argomento di importazione.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Seleziona l'opzione Attiva l'importazione.

  5. Per l'origine di importazione, seleziona Amazon Kinesis Data Streams.

  6. Inserisci i seguenti dettagli:

    • ARN del flusso Kinesis: l'ARN del flusso di dati Kinesis che prevedi di importare in Pub/Sub. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN del consumer Kinesis: l'ARN della risorsa consumer registrata al flusso di dati AWS Kinesis. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN del ruolo AWS: l'ARN del ruolo AWS. Il formato ARN del ruolo è il seguente: arn:aws:iam:${Account}:role/${RoleName}.

    • Service account: il account di servizio che hai creato in Creare un account di servizio in Google Cloud.

  7. Fai clic su Aggiorna.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics update con tutti i flag menzionati nel seguente esempio:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Sostituisci quanto segue:

    • TOPIC_ID è l'ID o il nome dell'argomento. Questo campo non può essere aggiornato.

    • KINESIS_STREAM_ARN è l'ARN dei flussi di dati Kinesis che prevedi di importare in Pub/Sub. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN è l'ARN della risorsa consumer registrata in AWS Kinesis Data Streams. Il formato dell'ARN è il seguente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN è l'ARN del ruolo AWS. Il formato ARN del ruolo è il seguente: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT è lo account di servizio che hai creato in Creare un account di servizio in Google Cloud.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Python.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Per ulteriori informazioni sugli ARN, consulta Amazon Resource Names (ARN) e Identificatori IAM.

Convertire un argomento standard in un argomento di importazione di Cloud Storage

Per convertire un argomento standard in un argomento di importazione di Cloud Storage, innanzitutto verifica di soddisfare tutti i prerequisiti.

Console

  1. Nella console Google Cloud , vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic sull'argomento che vuoi convertire in un argomento di importazione di Cloud Storage.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Seleziona l'opzione Attiva l'importazione.

  5. Per l'origine di importazione, seleziona Google Cloud Storage.

  6. Per il bucket Cloud Storage, fai clic su Sfoglia.

    Viene visualizzata la pagina Seleziona bucket. Seleziona una delle seguenti opzioni:

    • Seleziona un bucket esistente da qualsiasi progetto appropriato.

    • Fai clic sull'icona Crea e segui le istruzioni sullo schermo per creare un nuovo bucket. Dopo aver creato il bucket, selezionalo per l'argomento sull'importazione da Cloud Storage.

  7. Quando specifichi il bucket, Pub/Sub controlla se sono presenti le autorizzazioni appropriate per l'account di servizio Pub/Sub. In caso di problemi di autorizzazione, viene visualizzato un messaggio di errore correlato alle autorizzazioni.

    Se si verificano problemi di autorizzazione, fai clic su Imposta autorizzazioni. Per ulteriori informazioni, consulta Concedi le autorizzazioni Cloud Storage all'account di servizio Pub/Sub.

  8. In Formato oggetto, seleziona Testo, Avro o Pub/Sub Avro.

    Se selezioni Testo, puoi facoltativamente specificare un Delimitatore con cui suddividere gli oggetti in messaggi.

    Per ulteriori informazioni su queste opzioni, vedi Formato di input.

  9. Facoltativo. Puoi specificare un tempo minimo di creazione dell'oggetto per il tuo argomento. Se impostato, vengono importati solo gli oggetti creati dopo il tempo minimo di creazione dell'oggetto.

    Per ulteriori informazioni, consulta Tempo minimo di creazione dell'oggetto.

  10. Devi specificare un pattern glob. Per importare tutti gli oggetti nel bucket, utilizza ** come pattern glob. Verranno importati solo gli oggetti che corrispondono al pattern specificato.

    Per ulteriori informazioni, consulta Abbinare un pattern glob.

  11. Mantieni le altre impostazioni predefinite.
  12. Fai clic su Aggiorna argomento.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Per evitare di perdere le impostazioni per l'argomento di importazione, assicurati di includerle tutte ogni volta che aggiorni l'argomento. Se ometti qualcosa, Pub/Sub reimposta l'impostazione sul valore predefinito originale.

    Esegui il comando gcloud pubsub topics update con tutti i flag menzionati nel seguente esempio:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Sostituisci quanto segue:

    • TOPIC_ID è l'ID o il nome dell'argomento. Questo campo non può essere aggiornato.

    • BUCKET_NAME: specifica il nome di un bucket esistente. Ad esempio, prod_bucket. Il nome del bucket non deve includere l'ID progetto. Per creare un bucket, consulta la sezione Creare bucket.

    • INPUT_FORMAT: specifica il formato degli oggetti importati. Può essere text, avro o pubsub_avro. Per ulteriori informazioni su queste opzioni, consulta Formato di input.

    • TEXT_DELIMITER: specifica il delimitatore con cui suddividere gli oggetti di testo in messaggi Pub/Sub. Deve essere costituito da un solo carattere e deve essere impostato solo quando INPUT_FORMAT è text. Il valore predefinito è il carattere di nuova riga (\n).

      Quando utilizzi gcloud CLI per specificare il delimitatore, presta molta attenzione alla gestione dei caratteri speciali come il nuovo riga \n. Utilizza il formato '\n' per assicurarti che il delimitatore venga interpretato correttamente. Se utilizzi semplicemente \n senza virgolette o sfugge, il delimitatore sarà "n".

    • MINIMUM_OBJECT_CREATE_TIME: specifica il tempo minimo in cui è stato creato un oggetto affinché possa essere importato. Deve essere in UTC nel formato YYYY-MM-DDThh:mm:ssZ. Ad esempio: 2024-10-14T08:30:30Z.

      È valida qualsiasi data, passata o futura, compresa tra 0001-01-01T00:00:00Z e 9999-12-31T23:59:59Z.

    • MATCH_GLOB: specifica il pattern glob da associare per consentire l'importazione di un oggetto. Quando utilizzi gcloud CLI, un pattern di corrispondenza con caratteri * deve avere il carattere * formattato come carattere di escape nel formato \*\*.txt o l'intero pattern di corrispondenza deve essere tra virgolette "**.txt" o '**.txt'. Per informazioni sulla sintassi supportata per i pattern glob, consulta la documentazione di Cloud Storage.