Modifier le type de sujet

Vous pouvez convertir un thème d'importation en thème standard ou inversement, un thème standard en thème d'importation.

Convertir un sujet d'importation en sujet standard

Pour convertir un sujet d'importation en sujet standard, effacez les paramètres d'ingestion. Procédez comme suit :

Console

  1. Dans la console Google Cloud, accédez à la page Topics (Sujets).

    Accéder aux sujets

  2. Cliquez sur le sujet d'importation.

  3. Sur la page des détails de l'article, cliquez sur Modifier.

  4. Décochez l'option Activer l'ingestion.

  5. Cliquez sur Mettre à jour.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics update :

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Remplacez TOPIC_ID par l'ID du sujet.

Convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams

Pour convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams, vérifiez d'abord que vous remplissez toutes les conditions préalables.

Console

  1. Dans la console Google Cloud, accédez à la page Topics (Sujets).

    Accéder aux sujets

  2. Cliquez sur le sujet que vous souhaitez convertir en sujet d'importation.

  3. Sur la page des détails de l'article, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Amazon Kinesis Data Streams.

  6. Saisissez les informations suivantes :

    • ARN du flux Kinesis: ARN du flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN du client Kinesis: ARN de la ressource client enregistrée auprès du flux de données AWS Kinesis. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN du rôle AWS: ARN du rôle AWS. Le format de l'ARN du rôle est le suivant : arn:aws:iam:${Account}:role/${RoleName}.

    • Compte de service: compte de service que vous avez créé dans Créer un compte de service dans Google Cloud.

  7. Cliquez sur Mettre à jour.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics update avec tous les indicateurs mentionnés dans l'exemple suivant:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être modifié.

    • KINESIS_STREAM_ARN est l'ARN des flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN est l'ARN de la ressource client enregistrée dans AWS Kinesis Data Streams. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN est l'ARN du rôle AWS. Le format de l'ARN du rôle est le suivant : arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT est le compte de service que vous avez créé dans Créer un compte de service dans Google Cloud.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Pour en savoir plus sur les ARN, consultez les pages Noms de ressource Amazon (ARN) et Identifiants IAM.

Convertir un sujet standard en sujet d'importation Cloud Storage

Pour convertir un sujet standard en sujet d'importation Cloud Storage, vérifiez d'abord que vous remplissez tous les prérequis.

Console

  1. Dans la console Google Cloud, accédez à la page Topics (Sujets).

    Accéder aux sujets

  2. Cliquez sur le sujet que vous souhaitez convertir en sujet d'importation Cloud Storage.

  3. Sur la page des détails de l'article, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Google Cloud Storage.

  6. Pour le bucket Cloud Storage, cliquez sur Parcourir.

    La page Sélectionner un bucket s'ouvre. Sélectionnez l'une des options suivantes :

    • Sélectionnez un bucket existant dans un projet approprié.

    • Cliquez sur l'icône Créer, puis suivez les instructions à l'écran pour créer un bucket. Après avoir créé le bucket, sélectionnez-le pour le sujet d'importation Cloud Storage.

  7. Lorsque vous spécifiez le bucket, Pub/Sub recherche les autorisations appropriées sur le bucket pour le compte de service Pub/Sub. En cas de problème d'autorisation, un message d'erreur s'affiche.

    Si vous rencontrez des problèmes d'autorisation, cliquez sur Définir les autorisations. Pour en savoir plus, consultez la section Accorder des autorisations Cloud Storage au compte de service Pub/Sub.

  8. Pour Format d'objet, sélectionnez Texte, Avro ou Pub/Sub Avro.

    Si vous sélectionnez Texte, vous pouvez éventuellement spécifier un délimiteur pour diviser les objets en messages.

    Pour en savoir plus sur ces options, consultez la section Format d'entrée.

  9. Facultatif. Vous pouvez spécifier une durée minimale pour créer l'objet pour votre sujet. Si elle est définie, seuls les objets créés après la durée minimale de création d'objets sont ingérés.

    Pour en savoir plus, consultez la section Durée minimale pour créer l'objet.

  10. Vous devez spécifier un modèle glob. Pour ingérer tous les objets du bucket, utilisez ** comme modèle glob. Seuls les objets correspondant au modèle donné sont ingérés.

    Pour en savoir plus, consultez la section Correspondre à un format générique.

  11. Conservez les autres paramètres par défaut.
  12. Cliquez sur Mettre à jour le sujet.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pour éviter de perdre vos paramètres pour le sujet d'importation, veillez à les inclure tous chaque fois que vous mettez à jour le sujet. Si vous omettez quelque chose, Pub/Sub rétablit le paramètre sur sa valeur par défaut d'origine.

    Exécutez la commande gcloud pubsub topics update avec toutes les options mentionnées dans l'exemple suivant:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être modifié.

    • BUCKET_NAME: spécifie le nom d'un bucket existant. Exemple :prod_bucket Le nom du bucket ne doit pas inclure l'ID du projet. Pour créer un bucket, consultez la page Créer des buckets.

    • INPUT_FORMAT: spécifie le format des objets ingérés. Il peut être défini sur text, avro ou pubsub_avro. Pour en savoir plus sur ces options, consultez la section Format d'entrée.

    • TEXT_DELIMITER: spécifie le séparateur à utiliser pour diviser les objets texte en messages Pub/Sub. Il doit s'agir d'un seul caractère et ne doit être défini que lorsque INPUT_FORMAT est text. La valeur par défaut est le caractère de nouvelle ligne (\n).

      Lorsque vous utilisez gcloud CLI pour spécifier le délimiteur, soyez particulièrement attentif à la gestion des caractères spéciaux tels que la nouvelle ligne \n. Utilisez le format '\n' pour vous assurer que le séparateur est correctement interprété. L'utilisation simple de \n sans guillemets ni échappement entraîne un séparateur "n".

    • MINIMUM_OBJECT_CREATE_TIME: spécifie le délai minimal au cours duquel un objet a été créé pour qu'il soit ingéré. au format UTC YYYY-MM-DDThh:mm:ssZ. Par exemple, 2024-10-14T08:30:30Z.

      Toute date, passée ou future, de 0001-01-01T00:00:00Z à 9999-12-31T23:59:59Z inclus, est valide.

    • MATCH_GLOB: spécifie le modèle glob à faire correspondre pour qu'un objet soit ingéré. Lorsque vous utilisez gcloud CLI, le caractère * d'un caractère générique de correspondance avec des caractères * doit être formaté en tant qu'expression s'échappant sous la forme \*\*.txt, ou l'ensemble du caractère générique de correspondance doit être placé entre guillemets "**.txt" ou '**.txt'. Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.