Créer un sujet avec ingestion AWS MSK

Créer un sujet avec l'ingestion AWS MSK

En savoir plus

Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :

Exemple de code

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& cluster_arn,
   std::string const& msk_topic, std::string const& aws_role_arn,
   std::string const& gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_msk =
      request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
  aws_msk->set_cluster_arn(cluster_arn);
  aws_msk->set_topic(msk_topic);
  aws_msk->set_aws_role_arn(aws_role_arn);
  aws_msk->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using System;

public class CreateTopicWithAwsMskIngestionSample
{
    public Topic CreateTopicWithAwsMskIngestion(string projectId, string topicId, string clusterArn, string mskTopic, string awsRoleArn, string gcpServiceAccount)
    {
        // Define settings for Amazon MSK ingestion
        IngestionDataSourceSettings ingestionDataSourceSettings = new IngestionDataSourceSettings
        {
            AwsMsk = new IngestionDataSourceSettings.Types.AwsMsk
            {
                ClusterArn = clusterArn,
                Topic = mskTopic,
                AwsRoleArn = awsRoleArn,
                GcpServiceAccount = gcpServiceAccount
            }
        };

        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        Topic topic = new Topic()
        {
            Name = TopicName.FormatProjectTopic(projectId, topicId),
            IngestionDataSourceSettings = ingestionDataSourceSettings
        };
        Topic createdTopic = publisher.CreateTopic(topic);
        Console.WriteLine($"Topic {topic.Name} created.");

        return createdTopic;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"

	// // AWS MSK ingestion settings.
	// clusterARN := "cluster-arn"
	// mskTopic := "msk-topic"
	// awsRoleARN := "aws-role-arn"
	// gcpSA := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAmazonMSK{
				ClusterARN:        clusterARN,
				Topic:             mskTopic,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpSA,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Created topic with AWS MSK ingestion settings: %v\n", t)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAwsMskIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // AWS MSK ingestion settings.
    String clusterArn = "cluster-arn";
    String mskTopic = "msk-topic";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithAwsMskIngestionExample(
        projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithAwsMskIngestionExample(
      String projectId,
      String topicId,
      String clusterArn,
      String mskTopic,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsMsk awsMsk =
          IngestionDataSourceSettings.AwsMsk.newBuilder()
              .setClusterArn(clusterArn)
              .setTopic(mskTopic)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
  topicNameOrId,
  clusterArn,
  mskTopic,
  awsRoleArn,
  gcpServiceAccount,
) {
  // Creates a new topic with AWS MSK ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsMsk: {
        clusterArn,
        topic: mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le Guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const clusterArn = 'arn:aws:kafka:...';
// const mskTopic = 'YOUR_MSK_TOPIC';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithAwsMskIngestion(
  topicNameOrId: string,
  clusterArn: string,
  mskTopic: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
) {
  // Creates a new topic with AWS MSK ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsMsk: {
        clusterArn,
        topic: mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration pour PHP du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub topic with AWS MSK ingestion.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $clusterArn  The Amazon Resource Name (ARN) that uniquely identifies the cluster.
 * @param string $mskTopic  The name of the topic in the Amazon MSK cluster that Pub/Sub will import from.
 * @param string $awsRoleArn  AWS role ARN to be used for Federated Identity authentication with Amazon MSK.
 *               Check the Pub/Sub docs for how to set up this role and the required permissions that need to be
 *               attached to it.
 * @param string $gcpServiceAccount  The GCP service account to be used for Federated Identity authentication
 *               with Amazon MSK (via a AssumeRoleWithWebIdentity call for the provided role). The aws_role_arn
 *               must be set up with accounts.google.com:sub equals to this service account number.
 */
function create_topic_with_aws_msk_ingestion(
    string $projectId,
    string $topicName,
    string $clusterArn,
    string $mskTopic,
    string $awsRoleArn,
    string $gcpServiceAccount
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->createTopic($topicName, [
        'ingestionDataSourceSettings' => [
            'aws_msk' => [
                'cluster_arn' => $clusterArn,
                'topic' => $mskTopic,
                'aws_role_arn' => $awsRoleArn,
                'gcp_service_account' => $gcpServiceAccount
            ]
        ]
    ]);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

Pour vous authentifier auprès de Pub/Sub, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# cluster_arn = "your-cluster-arn"
# msk_topic = "your-msk-topic"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_msk=IngestionDataSourceSettings.AwsMsk(
            cluster_arn=cluster_arn,
            topic=msk_topic,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")

Étape suivante

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud , consultez l'explorateur d'exemplesGoogle Cloud .