Crear un tema de importación de Amazon Managed Streaming para Apache Kafka

Un tema de importación de Amazon Managed Streaming para Apache Kafka (Amazon MSK) te permite ingerir datos de Amazon MSK de forma continua como fuente externa y en Pub/Sub. Después, puedes transmitir los datos a cualquiera de los destinos que admite Pub/Sub.

En este documento se explica cómo crear y gestionar temas de importación de Amazon MSK. Para crear un tema estándar, consulta Crear un tema estándar.

Para obtener más información sobre los temas de importación, consulta el artículo Acerca de los temas de importación.

Antes de empezar

Roles y permisos necesarios

Para obtener los permisos que necesitas para crear y gestionar temas de importación de Amazon MSK, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.editor) en tu tema o proyecto. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear y gestionar temas de importación de Amazon MSK. Para ver los permisos exactos que se necesitan, despliega la sección Permisos necesarios:

Permisos obligatorios

Para crear y gestionar temas de importación de Amazon MSK, se necesitan los siguientes permisos:

  • Crea un tema de importación: pubsub.topics.create
  • Eliminar un tema importado: pubsub.topics.delete
  • Obtener un tema de importación: pubsub.topics.get
  • Para enumerar un tema de importación, sigue estos pasos: pubsub.topics.list
  • Publicar en un tema de importación: pubsub.topics.publish and pubsub.serviceAgent
  • Actualizar un tema de importación: pubsub.topics.update
  • Obtiene la política de gestión de identidades y accesos de un tema de importación: pubsub.topics.getIamPolicy
  • Configura la política de gestión de identidades y accesos de un tema de importación: pubsub.topics.setIamPolicy

También puedes obtener estos permisos con roles personalizados u otros roles predefinidos.

Puedes configurar el control de acceso a nivel de proyecto y de recurso individual.

Configurar la identidad federada para acceder a Amazon MSK

La federación de identidades de cargas de trabajo permite que los servicios accedan a cargas de trabajo que se ejecutan fuera de Google Cloud. Google Cloud Con la federación de identidades, no es necesario mantener ni transferir credenciales a Google Cloud para acceder a tus recursos en otras nubes. En su lugar, puedes usar las identidades de las cargas de trabajo para autenticarte en Google Cloud y acceder a los recursos.

Crear una cuenta de servicio en Google Cloud

Este paso es opcional. Si ya tienes una cuenta de servicio, puedes usarla en este procedimiento en lugar de crear una nueva. Si usas una cuenta de servicio, ve a Registrar el ID único de la cuenta de servicio para el siguiente paso.

En el caso de los temas de importación de Amazon MSK, Pub/Sub usa la cuenta de servicio como identidad para acceder a los recursos de AWS.

Para obtener más información sobre cómo crear una cuenta de servicio, incluidos los requisitos previos, los roles y permisos necesarios, y las directrices para asignar nombres, consulta el artículo Crear cuentas de servicio. Después de crear una cuenta de servicio, es posible que tengas que esperar 60 segundos o más antes de usarla. Este comportamiento se produce porque las operaciones de lectura son coherentes con el tiempo. La nueva cuenta de servicio puede tardar en aparecer.

Registrar el ID único de la cuenta de servicio

Necesitas un ID único de cuenta de servicio para configurar un rol en la consola de AWS.

  1. En la Google Cloud consola, ve a la página de detalles de la cuenta de servicio.

    Ir a la cuenta de servicio

  2. Haz clic en la cuenta de servicio que acabas de crear o en la que tienes previsto usar.

  3. En la página Detalles de la cuenta de servicio, anota el número de ID único.

    Necesitas el ID como parte del flujo de trabajo para configurar un rol en la consola de AWS.

Añadir el rol de creador de tokens de cuenta de servicio a la cuenta de servicio de Pub/Sub

El rol Creador de tokens de cuenta de servicio (roles/iam.serviceAccountTokenCreator) permite a las entidades principales crear credenciales de duración reducida para una cuenta de servicio. Estos tokens o credenciales se usan para suplantar la identidad de la cuenta de servicio.

Para obtener más información sobre la suplantación de identidad en cuentas de servicio, consulta el artículo Suplantación de identidad en cuentas de servicio.

También puedes añadir el rol de editor de Pub/Sub (roles/pubsub.publisher) durante este procedimiento. Para obtener más información sobre el rol y por qué lo añade, consulte Añadir el rol de editor de Pub/Sub a la cuenta de servicio de Pub/Sub.

  1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

    Ir a IAM

  2. Marca la casilla Incluir concesiones de roles proporcionadas por Google.

  3. Busca la cuenta de servicio que tenga el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Añadir otro rol.

  6. Busca y haz clic en el rol Creador de tokens de cuenta de servicio (roles/iam.serviceAccountTokenCreator).

  7. Haz clic en Guardar.

Crear una política en AWS

Necesitas una política en AWS para que Pub/Sub se autentique en AWS y pueda ingerir datos de Amazon MSK.

Para crear una política en AWS, sigue estos pasos:

  1. Inicia sesión en la consola de administración de AWS y abre la consola de IAM.

  2. En el panel de navegación de la consola de IAM, haz clic en Gestión de accesos > Políticas.

  3. Haz clic en Crear política.

  4. En Click a service (Hacer clic en un servicio), haga clic en MSK.

  5. En Acción permitida,haga clic en Leer > GetBootstrapBrokers.

    Esta acción concede permiso para obtener los brokers de arranque que Pub/Sub usa para conectarse al clúster de MSK.

  6. Haz clic en Añadir más permisos.

  7. En Select a service (Seleccionar un servicio), haga clic en Apache Kafka APIs for MSK (APIs de Apache Kafka para MSK).

  8. En Acción permitida, selecciona lo siguiente:

    • Lista > DescribeTopic

      Esta acción concede permiso al tema de ingestión de Pub/Sub para obtener detalles sobre el tema de Kafka de Amazon MSK.

    • Leer > ReadData

      Esta acción concede permiso para leer datos del tema de Kafka de Amazon MSK.

    • Escribir > Conectar

      Esta acción concede permiso para conectarse y autenticarse en el clúster de Kafka de Amazon MSK.

  9. En Resources (Recursos), especifica el ARN del clúster (si quieres restringir la política a clústeres específicos, lo cual es recomendable).

  10. Haz clic en Añadir más permisos.

  11. En Seleccionar un servicio, haz clic en STS.

  12. En Acción permitida, haga clic en Escribir > AssumeRoleWithWebIdentity.

    Esta acción concede permiso para obtener un conjunto de credenciales de seguridad temporales para Pub/Sub con el fin de autenticarse en Amazon MSK mediante la federación de identidades.

  13. Haz clic en Siguiente.

  14. Introduce el nombre y la descripción de la política.

  15. Haz clic en Crear política.

Crear un rol en AWS con una política de confianza personalizada

Debes crear un rol en AWS para que Pub/Sub pueda autenticarse en AWS e ingerir datos de Amazon MSK.

  1. Inicia sesión en la consola de administración de AWS y abre la consola de IAM.

  2. En el panel de navegación de la consola de Gestión de identidades y accesos, haz clic en Roles.

  3. Haz clic en Crear rol.

  4. En Seleccionar entidad de confianza, haz clic en Política de confianza personalizada.

  5. En la sección Política de confianza personalizada, introduce o pega lo siguiente:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Sustituye <SERVICE_ACCOUNT_UNIQUE_ID> por el ID único de la cuenta de servicio que has anotado en Anota el ID único de la cuenta de servicio.

  6. Haz clic en Siguiente.

  7. En Añadir permisos, busca y haz clic en la política personalizada que acabas de crear.

  8. Haz clic en Siguiente.

  9. Introduce el nombre y la descripción del rol.

  10. Haz clic en Crear rol.

Añadir el rol de editor de Pub/Sub a la cuenta principal de Pub/Sub

Para habilitar la publicación, debes asignar un rol de editor a la cuenta de servicio de Pub/Sub para que Pub/Sub pueda publicar en el tema de importación de Amazon MSK.

Añadir el rol de agente de servicio de Pub/Sub a la cuenta de servicio de Pub/Sub

Para permitir que Pub/Sub use la cuota de publicación del proyecto del tema de importación, el agente de servicio de Pub/Sub necesita el permiso serviceusage.services.use en el proyecto del tema de importación.

Para conceder este permiso, te recomendamos que añadas el rol de agente de servicio de Pub/Sub a la cuenta de servicio de Pub/Sub.

Si la cuenta de servicio de Pub/Sub no tiene el rol de agente de servicio de Pub/Sub, se le puede asignar de la siguiente manera:

  1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

    Ir a IAM

  2. Marca la casilla Incluir concesiones de roles proporcionadas por Google.

  3. Busca la cuenta de servicio que tenga el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Añadir otro rol.

  6. Busca y haz clic en el rol Agente de servicio de Pub/Sub (roles/pubsub.serviceAgent).

  7. Haz clic en Guardar.

Habilitar la publicación de todos los temas

Usa este método si no has creado ningún tema de importación de Amazon MSK.

  1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

    Ir a IAM

  2. Marca la casilla Incluir concesiones de roles proporcionadas por Google.

  3. Busca la cuenta de servicio que tenga el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Añadir otro rol.

  6. Busca y haz clic en el rol Editor de Pub/Sub (roles/pubsub.publisher).

  7. Haz clic en Guardar.

Habilitar la publicación desde un solo tema

Usa este método solo si el tema de importación de Amazon MSK ya existe.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Haz los cambios siguientes:

    • TOPIC_ID: el ID del tema de importación de Amazon MSK.

    • PROJECT_NUMBER: el número de proyecto. Para ver el número de proyecto, consulta el artículo Identificar proyectos.

  3. Añadir el rol de usuario de cuenta de servicio a la cuenta de servicio

    El rol Usuario de cuenta de servicio (roles/iam.serviceAccountUser) incluye el permiso iam.serviceAccounts.actAs, que permite que un principal adjunte una cuenta de servicio a los ajustes de ingestión del tema de importación de Amazon MSK y use esa cuenta de servicio para la identidad federada.

    1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

      Ir a IAM

    2. En el principal que emite las llamadas para crear o actualizar temas, haz clic en el botón Editar principal.

    3. Si es necesario, haz clic en Añadir otro rol.

    4. Busca y haz clic en el rol Usuario de cuenta de servicio (roles/iam.serviceAccountUser).

    5. Haz clic en Guardar.

    Usar temas de importación de Amazon MSK

    Puede crear un tema de importación o editar uno que ya tenga.

    Cuestiones importantes

    • Si creas el tema y la suscripción por separado, aunque lo hagas de forma rápida, se pueden perder datos. Hay un breve periodo en el que el tema está disponible sin suscripción. Si se envían datos al tema durante este periodo, se perderán. Si creas el tema primero, luego la suscripción y, por último, conviertes el tema en un tema de importación, te aseguras de que no se pierda ningún mensaje durante el proceso de importación.

    • Si necesitas volver a crear el tema de Kafka de un tema de importación con el mismo nombre, no puedes simplemente eliminar el tema de Kafka y volver a crearlo. Esta acción puede invalidar la gestión de desfases de Pub/Sub, lo que puede provocar la pérdida de datos. Para mitigar este problema, sigue estos pasos:

      • Elimina el tema de importación de Pub/Sub.
      • Elimina el tema de Kafka.
      • Crea el tema de Kafka.
      • Crea el tema de importación de Pub/Sub.
    • Los datos de un tema de Kafka de Amazon MSK siempre se leen desde el offset más antiguo.

    Crear temas de importación de Amazon MSK

    Para obtener más información sobre las propiedades asociadas a un tema, consulta Propiedades de un tema.

    Asegúrate de que has completado los siguientes procedimientos:

    Para crear un tema de importación de Amazon MSK, sigue estos pasos:

    Consola

    1. En la Google Cloud consola, ve a la página Temas.

      Ir a Temas

    2. Haz clic en Crear tema.

    3. En el campo ID de tema, introduce un ID para el tema de importación de Amazon MSK. Para obtener más información sobre cómo asignar nombres a los temas, consulta las directrices de nomenclatura.

    4. Selecciona Añadir una suscripción predeterminada.

    5. Selecciona Habilitar la ingesta.

    6. En la fuente de ingesta, selecciona Amazon MSK.

    7. Introduce los siguientes datos:

      • ARN del clúster: el ARN de Amazon MSK que estás ingiriendo en Pub/Sub. El formato del ARN es el siguiente: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}.
      • Tema: el nombre del tema de Kafka de Amazon MSK que estás ingiriendo en Pub/Sub.
      • Nombre de recurso de Amazon (ARN) del rol de AWS: el ARN del rol de AWS. El formato ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.
      • Cuenta de servicio: la cuenta de servicio que has creado en Crear una cuenta de servicio en Google Cloud.
    8. Haz clic en Crear tema.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Ejecuta el comando gcloud pubsub topics create:

      gcloud pubsub topics create TOPIC_ID \
            --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
            --aws-msk-ingestion-topic MSK_TOPIC \
            --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
            --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

      Haz los cambios siguientes:

      • TOPIC_ID: el nombre o el ID de tu tema de Pub/Sub.
      • MSK_CLUSTER_ARN: el ARN del clúster de Amazon MSK en el que estás insertando datos en Pub/Sub. El formato del ARN es el siguiente: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}.
      • MSK_TOPIC: el nombre del tema de Kafka de Amazon MSK que estás ingiriendo en Pub/Sub.
      • MSK_ROLE_ARN: el ARN del rol de AWS. El formato ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.
      • PUBSUB_SERVICE_ACCOUNT: la cuenta de servicio que has creado en Crear una cuenta de servicio en Google Cloud.
    3. C++

      Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string const& cluster_arn,
         std::string const& msk_topic, std::string const& aws_role_arn,
         std::string const& gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* aws_msk =
            request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
        aws_msk->set_cluster_arn(cluster_arn);
        aws_msk->set_topic(msk_topic);
        aws_msk->set_aws_role_arn(aws_role_arn);
        aws_msk->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

      import (
      	"context"
      	"fmt"
      	"io"
      
      	"cloud.google.com/go/pubsub"
      )
      
      func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      
      	// // AWS MSK ingestion settings.
      	// clusterARN := "cluster-arn"
      	// mskTopic := "msk-topic"
      	// awsRoleARN := "aws-role-arn"
      	// gcpSA := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	cfg := &pubsub.TopicConfig{
      		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
      			Source: &pubsub.IngestionDataSourceAmazonMSK{
      				ClusterARN:        clusterARN,
      				Topic:             mskTopic,
      				AWSRoleARN:        awsRoleARN,
      				GCPServiceAccount: gcpSA,
      			},
      		},
      	}
      	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Created topic with AWS MSK ingestion settings: %v\n", t)
      	return nil
      }
      

      Java

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithAwsMskIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // AWS MSK ingestion settings.
          String clusterArn = "cluster-arn";
          String mskTopic = "msk-topic";
          String awsRoleArn = "aws-role-arn";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithAwsMskIngestionExample(
              projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
        }
      
        public static void createTopicWithAwsMskIngestionExample(
            String projectId,
            String topicId,
            String clusterArn,
            String mskTopic,
            String awsRoleArn,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.AwsMsk awsMsk =
                IngestionDataSourceSettings.AwsMsk.newBuilder()
                    .setClusterArn(clusterArn)
                    .setTopic(mskTopic)
                    .setAwsRoleArn(awsRoleArn)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const clusterArn = 'arn:aws:kafka:...';
      // const mskTopic = 'YOUR_MSK_TOPIC';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithAwsMskIngestion(
        topicNameOrId,
        clusterArn,
        mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      ) {
        // Creates a new topic with AWS MSK ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsMsk: {
              clusterArn,
              topic: mskTopic,
              awsRoleArn,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
      }

      Node.ts

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const clusterArn = 'arn:aws:kafka:...';
      // const mskTopic = 'YOUR_MSK_TOPIC';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithAwsMskIngestion(
        topicNameOrId: string,
        clusterArn: string,
        mskTopic: string,
        awsRoleArn: string,
        gcpServiceAccount: string,
      ) {
        // Creates a new topic with AWS MSK ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsMsk: {
              clusterArn,
              topic: mskTopic,
              awsRoleArn,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
      }

      Python

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # cluster_arn = "your-cluster-arn"
      # msk_topic = "your-msk-topic"
      # aws_role_arn = "your-aws-role-arn"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              aws_msk=IngestionDataSourceSettings.AwsMsk(
                  cluster_arn=cluster_arn,
                  topic=msk_topic,
                  aws_role_arn=aws_role_arn,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")

    Para obtener más información sobre los ARNs, consulta Nombres de recursos de Amazon (ARNs) e Identificadores de IAM.

    Si tienes problemas, consulta Solucionar problemas al importar un tema de Amazon MSK.

    Editar temas de importación de Amazon MSK

    Para editar la configuración de la fuente de datos de ingestión de un tema de importación de Amazon MSK, sigue estos pasos:

    Consola

    1. En la Google Cloud consola, ve a la página Temas.

      Ir a Temas

    2. Haz clic en el tema de importación de Amazon MSK.

    3. En la página de detalles del tema, haz clic en Editar.

    4. Actualiza los campos que quieras cambiar.

    5. Haz clic en Actualizar.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Ejecuta el comando gcloud pubsub topics update con todas las marcas que se mencionan en el siguiente ejemplo:

      gcloud pubsub topics update TOPIC_ID \
          --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
          --aws-msk-ingestion-topic MSK_TOPIC \
          --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
          --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      Haz los cambios siguientes:

      • TOPIC_ID: el nombre o el ID de tu tema de Pub/Sub.
      • MSK_CLUSTER_ARN: el ARN del clúster de Amazon MSK en el que estás insertando datos en Pub/Sub. El formato del ARN es el siguiente: arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}.
      • MSK_TOPIC: el nombre del tema de Kafka de Amazon MSK que estás ingiriendo en Pub/Sub.
      • MSK_ROLE_ARN: el ARN del rol de AWS. El formato ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.
      • PUBSUB_SERVICE_ACCOUNT: la cuenta de servicio que has creado en Crear una cuenta de servicio en Google Cloud.

    Cuotas y límites

    El rendimiento de publicación de los temas de importación está limitado por la cuota de publicación del tema. Para obtener más información, consulta las cuotas y los límites de Pub/Sub.

    Siguientes pasos

    Apache Kafka® es una marca registrada de Apache Software Foundation o sus filiales en Estados Unidos u otros países.