Créer un sujet d'importation

Un sujet d'importation vous permet d'ingérer des données provenant de sources externes dans Pub/Sub. Vous pouvez ensuite diffuser les données en flux continu vers n'importe quelle destination compatible avec Pub/Sub.

Pub/Sub est compatible avec Amazon Kinesis Data Streams en tant que source externe pour l'ingestion de données dans un sujet d'importation.

Présentation des thèmes d'importation

L'ingestion est activée sur un sujet d'importation en tant que propriété. Cela permet à un sujet importé d'ingérer des données par flux. Vous pouvez activer l'ingestion sur un sujet à l'aide de la console, de Google Cloud CLI, d'appels REST ou des bibliothèques clientes. Dans le cadre de la gestion du sujet d'importation, Google Cloud assure la surveillance et le scaling du pipeline d'ingestion.

Sans sujet d'importation, l'insertion de flux de données dans Pub/Sub à partir d'une source de données nécessite un service supplémentaire. Ce service supplémentaire extrait les données de la source d'origine et les publie sur Pub/Sub. Le service supplémentaire peut être un moteur de streaming tel qu'Apache Spark ou un service écrit personnalisé. Vous devez également configurer, déployer, exécuter, faire évoluer et surveiller ce service.

Voici une liste d'informations importantes concernant les sujets d'importation:

  • Comme pour un sujet standard, vous pouvez toujours publier manuellement dans un sujet importé.

  • Vous ne pouvez associer qu'une seule source d'ingestion à un sujet d'importation.

Nous vous recommandons d'importer des sujets pour les flux de données. Si vous envisagez d'ingérer des données par lot dans BigQuery plutôt que d'ingérer des données en flux continu, vous pouvez essayer le service de transfert de données BigQuery (BQ DTS). Si vous souhaitez ingérer des données dans Cloud Storage, le service de transfert de stockage (STS) est une bonne option.

Avant de commencer

Rôles et autorisations requis pour gérer les sujets d'importation

Pour obtenir les autorisations nécessaires pour créer et gérer des sujets d'importation, demandez à votre administrateur de vous attribuer le rôle IAM Éditeur Pub/Sub(roles/pubsub.editor) sur votre sujet ou projet. Pour en savoir plus sur l'attribution de rôles, consultez la section Gérer les accès.

Ce rôle prédéfini contient les autorisations requises pour créer et gérer des sujets d'importation. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer et gérer des sujets d'importation:

  • Créez un sujet d'importation : pubsub.topics.create
  • Supprimez un sujet d'importation : pubsub.topics.delete
  • Obtenez un sujet d'importation : pubsub.topics.get
  • Répertorier un sujet d'importation : pubsub.topics.list
  • Publier dans un sujet d'importation : pubsub.topics.publish
  • Mettre à jour un sujet d'importation : pubsub.topics.update
  • Obtenez la stratégie IAM d'un sujet d'importation : pubsub.topics.getIamPolicy
  • Configurez la stratégie IAM pour un sujet d'importation : pubsub.topics.setIamPolicy

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Vous pouvez configurer le contrôle des accès au niveau du projet et au niveau de chaque ressource.

Configurer l'identité fédérée pour accéder aux flux de données Kinesis

La fédération d'identité de charge de travail permet aux services Google Cloud d'accéder aux charges de travail qui s'exécutent en dehors de Google Cloud. Avec la fédération d'identité, vous n'avez pas besoin de gérer ni de transmettre des identifiants à Google Cloud pour accéder à vos ressources dans d'autres clouds. À la place, vous pouvez utiliser les identités des charges de travail elles-mêmes pour vous authentifier auprès de Google Cloud et accéder aux ressources.

Créer un compte de service dans Google Cloud

Il s'agit d'une étape facultative. Si vous disposez déjà d'un compte de service, vous pouvez l'utiliser dans cette procédure au lieu d'en créer un autre. Si vous utilisez un compte de service existant, accédez à la section Enregistrer l'ID unique du compte de service à l'étape suivante.

Pour un sujet d'importation, Pub/Sub utilise le compte de service en tant qu'identité pour accéder aux ressources depuis AWS.

Pour en savoir plus sur la création d'un compte de service, y compris les conditions préalables, les rôles et les autorisations requis, ainsi que les consignes d'attribution de noms, consultez la section Créer des comptes de service. Après avoir créé un compte de service, vous devrez peut-être attendre au moins 60 secondes avant de l'utiliser. Ce comportement est dû au fait que les opérations de lecture sont cohérentes à terme. Il peut s'écouler un certain temps avant que le nouveau compte de service devienne visible.

Enregistrer l'ID unique du compte de service

Vous avez besoin d'un ID unique de compte de service pour configurer un rôle dans la console AWS.

  1. Dans la console Google Cloud, accédez à la page d'informations Compte de service.

    Accéder au compte de service

  2. Cliquez sur le compte de service que vous venez de créer ou sur celui que vous prévoyez d'utiliser.

  3. Sur la page Détails du compte de service, notez le numéro d'identifiant unique.

    Vous en aurez besoin dans la section Créer un rôle dans AWS à l'aide d'une stratégie d'approbation personnalisée.

Ajouter le rôle Créateur de jetons du compte de service au compte de service Pub/Sub

Le rôle Créateur de jetons du compte de service (roles/iam.serviceAccountTokenCreator) permet aux comptes principaux de créer des identifiants éphémères pour un compte de service. Ces jetons ou identifiants sont utilisés pour emprunter l'identité du compte de service.

Pour plus d'informations sur l'emprunt de l'identité d'un compte de service, consultez la page Emprunter l'identité d'un compte de service.

Vous pouvez également ajouter le rôle Éditeur Pub/Sub (roles/pubsub.publisher) au cours de cette procédure. Pour en savoir plus sur ce rôle et sur la raison pour laquelle vous l'ajoutez, consultez la section Ajouter le rôle d'éditeur Pub/Sub au compte de service Pub/Sub.

  1. Dans la console Google Cloud, accédez à la page IAM.

    Accéder à IAM

  2. Activez l'option Inclure les attributions de rôles fournies par Google.

  3. Recherchez le compte de service au format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Pour ce compte de service, cliquez sur le bouton Modifier le compte principal.

  5. Si nécessaire, cliquez sur Ajouter un autre rôle.

  6. Recherchez et sélectionnez le rôle Créateur de jetons du compte de service (roles/iam.serviceAccountTokenCreator).

  7. Cliquez sur Enregistrer.

Créer une stratégie dans AWS

Vous avez besoin d'une stratégie dans AWS permettant à Pub/Sub de s'authentifier auprès d'AWS afin que Pub/Sub puisse ingérer les données d'un flux de données AWS Kinesis. Avant de créer une stratégie AWS, créez un flux de données Kinesis et un consommateur enregistré sur celui-ci. Nous vous recommandons cette pratique afin de pouvoir limiter les autorisations à un flux spécifique.

  • Pour en savoir plus sur la création d'un flux de données AWS Kinesis, consultez la page Flux de données Kinesis.

  • Pour en savoir plus sur l'API de flux de données AWS Kinesis utilisée pour enregistrer les clients, consultez la page RegisterStreamConsumer.

  • Pour découvrir d'autres méthodes et informations sur la création d'une stratégie dans AWS, consultez la page Créer des stratégies IAM.

Pour créer une stratégie dans AWS, procédez comme suit:

  1. Connectez-vous à la console AWS Management Console et ouvrez la console IAM.

  2. Dans le volet de navigation de la console IAM, cliquez sur Gestion des accès > Stratégies.

  3. Cliquez sur Créer une stratégie.

  4. Dans le champ Select a service (Sélectionner un service), choisissez Kinesis.

  5. Pour action autorisée, sélectionnez les éléments suivants:

    • List > ListShards (Liste > ListShards).

      Cette action accorde l'autorisation de répertorier les segments dans un flux et fournit des informations sur chaque segment.

    • Lire > SubscribeToShard.

      Cette action accorde l'autorisation d'écouter un segment spécifique avec une distribution ramifiée améliorée.

    • Read > DescribeStreamConsumer.

      Cette action accorde l'autorisation d'obtenir la description d'un consommateur de flux enregistré.

    Ces autorisations couvrent la lecture à partir du flux. Pub/Sub n'accepte que la lecture d'un flux Kinesis avec Fan-Out amélioré à l'aide de l'API SubscribeToShard en streaming.

  6. Sous Ressources, si vous souhaitez limiter la règle à un flux ou à un consommateur spécifique (recommandé), spécifiez les champs ARN du consommateur et ARN du flux.

  7. Cliquez sur Ajouter des autorisations.

  8. Dans le champ Sélectionner un service, saisissez et sélectionnez STS.

  9. Pour action autorisée, sélectionnez Écrire > AssumeRoleWithWebIdentity.

    Cette action autorise l'obtention d'un ensemble d'identifiants de sécurité temporaires permettant à Pub/Sub de s'authentifier auprès du flux de données Kinesis via la fédération d'identité.

  10. Cliquez sur Suivant.

  11. Saisissez un nom et une description pour la règle.

  12. Cliquez sur Créer une stratégie.

Créer un rôle dans AWS à l'aide d'une stratégie d'approbation personnalisée

Vous devez créer un rôle dans AWS afin que Pub/Sub puisse s'authentifier auprès d'AWS pour ingérer les données de Kinesis Data Streams.

Pour créer un rôle à l'aide d'une stratégie d'approbation personnalisée, procédez comme suit:

  1. Connectez-vous à la console AWS Management Console et ouvrez la console IAM.

  2. Dans le volet de navigation de la console IAM, cliquez sur Rôles.

  3. Cliquez sur Créer un rôle.

  4. Dans le champ Sélectionner une entité de confiance, choisissez Règle de confiance personnalisée.

  5. Dans la section Règle de confiance personnalisée, saisissez ou collez la commande suivante:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Remplacez <SERVICE_ACCOUNT_UNIQUE_ID> par l'ID unique du compte de service que vous avez enregistré dans Enregistrer l'ID unique du compte de service.

  6. Cliquez sur Suivant.

  7. Pour Ajouter des autorisations, recherchez et sélectionnez la règle personnalisée que vous venez de créer.

  8. Cliquez sur Suivant.

  9. Saisissez un nom et une description pour le rôle.

  10. Cliquez sur Créer un rôle.

Ajouter le rôle d'éditeur Pub/Sub au compte de service Pub/Sub

Vous devez attribuer un rôle d'éditeur au compte de service Pub/Sub pour que Pub/Sub puisse publier dans le sujet d'importation à partir des flux de données AWS Kinesis.

Activer la publication à partir de tous les sujets

  1. Dans la console Google Cloud, accédez à la page IAM.

    Accéder à IAM

  2. Activez l'option Inclure les attributions de rôles fournies par Google.

  3. Recherchez le compte de service au format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Pour ce compte de service, cliquez sur le bouton Modifier le compte principal.

  5. Si nécessaire, cliquez sur Ajouter un autre rôle.

  6. Recherchez et sélectionnez le rôle Éditeur Pub/Sub (roles/pubsub.publisher).

  7. Cliquez sur Enregistrer.

Activer la publication à partir d'un seul sujet

Si vous souhaitez n'accorder l'autorisation de publication que pour un sujet d'importation spécifique, procédez comme suit:

  1. Dans la console Google Cloud, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la fenêtre de la console Google Cloud, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud pubsub topics add-iam-policy-binding :

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"
    

    Remplacez les éléments suivants :

    • TOPIC_ID est l'ID du sujet d'importation.

    • PROJECT_NUMBER est le numéro du projet. Pour afficher le numéro de projet, consultez la section Identifier des projets.

Ajouter le rôle utilisateur du compte de service au compte de service

Le rôle Utilisateur du compte de service (roles/iam.serviceAccountUser) inclut l'autorisation iam.serviceAccounts.actAs qui permet à un compte principal d'associer un compte de service aux paramètres d'ingestion du sujet d'importation et d'utiliser ce compte de service pour l'identité fédérée.

Procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page IAM.

    Accéder à IAM

  2. Pour le compte principal qui émet les appels de création ou de mise à jour de sujet, cliquez sur le bouton Modifier le compte principal.

  3. Si nécessaire, cliquez sur Ajouter un autre rôle.

  4. Recherchez et sélectionnez le rôle utilisateur du compte de service (roles/iam.serviceAccountUser).

  5. Cliquez sur Enregistrer.

Créer un sujet d'importation

Pour en savoir plus sur les propriétés associées à un thème, consultez la section Propriétés d'un thème.

Assurez-vous d'avoir suivi les procédures suivantes:

Pour créer un sujet d'importation, procédez comme suit:

Console

  1. Dans la console Google Cloud, accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur Créer un sujet.

  3. Dans le champ ID du sujet, saisissez un ID pour votre sujet d'importation.

    Pour en savoir plus sur l'attribution de noms aux sujets, consultez les consignes relatives aux noms.

  4. Sélectionnez Ajouter un abonnement par défaut.

  5. Sélectionnez Activer l'ingestion.

  6. Pour la source d'ingestion, sélectionnez Amazon Kinesis Data Streams.

  7. Saisissez les informations suivantes :

    • ARN du flux Kinesis: nom ARN du flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • Kinesis Consumer ARN (ARN du client Kinesis) : ARN de la ressource consommateur enregistrée auprès du flux de données AWS Kinesis. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN du rôle AWS: ARN du rôle AWS. Le format ARN du rôle est le suivant : arn:aws:iam:${Account}:role/${RoleName}.

    • Compte de service: compte de service que vous avez créé lors de la section Créer un compte de service dans Google Cloud.

  8. Cliquez sur Créer un sujet.

gcloud

  1. Dans la console Google Cloud, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la fenêtre de la console Google Cloud, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud pubsub topics create :

    gcloud pubsub topics create TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Remplacez les éléments suivants :

    • TOPIC_ID est l'ID du thème.

    • KINESIS_STREAM_ARN est le nom ARN des flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN correspond à l'ARN de la ressource de consommateur enregistrée dans les flux de données AWS Kinesis. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN est l'ARN du rôle AWS. Le format ARN du rôle est le suivant : arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT est le compte de service que vous avez créé à la section Créer un compte de service dans Google Cloud.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration de Go décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration de Java décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration de Python décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Avant d'essayer cet exemple, suivez les instructions de configuration de C++ décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub sur l'utilisation des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Pour en savoir plus sur les ARN, consultez les pages Noms de ressources Amazon (ARN) et Identifiants IAM.

Si vous rencontrez des problèmes, consultez Résoudre les problèmes liés à un sujet d'importation.

Modifier un sujet d'importation

Vous pouvez modifier les paramètres de la source de données d'ingestion d'un sujet d'importation. Procédez comme suit :

Console

  1. Dans la console Google Cloud, accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le sujet d'importation.

  3. Sur la page d'informations du sujet, cliquez sur Modifier.

  4. Mettez à jour les champs que vous souhaitez modifier.

  5. Cliquez sur Update (Mettre à jour).

gcloud

  1. Dans la console Google Cloud, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la fenêtre de la console Google Cloud, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la commande gcloud pubsub topics update avec toutes les options mentionnées dans l'exemple suivant:

      gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Remplacez les éléments suivants :

    • TOPIC_ID est l'ID du thème. Impossible de mettre à jour ce champ.

    • KINESIS_STREAM_ARN est le nom ARN des flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN correspond à l'ARN de la ressource de consommateur enregistrée dans les flux de données AWS Kinesis. Le format de l'ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN est l'ARN du rôle AWS. Le format de l'ARN du rôle est le suivant : arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT est le compte de service que vous avez créé à la section Créer un compte de service dans Google Cloud.

Quotas et limites pour les sujets d'importation

Le débit de l'éditeur pour les sujets d'importation est limité par le quota de publication du sujet. Pour en savoir plus, consultez la page Quotas et limites de Pub/Sub.

Étapes suivantes