Cambiar el tipo de tema

Puedes convertir un tema importado en uno estándar o viceversa.

Convertir un tema importado en un tema estándar

Para convertir un tema importado en un tema estándar, borra los ajustes de ingestión. Sigue estos pasos:

Consola

  1. En la Google Cloud consola, ve a la página Temas.

    Ir a Temas

  2. Haz clic en el tema que quieras importar.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Desmarca la opción Habilitar la ingestión.

  5. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Sustituye TOPIC_ID por el ID del tema.

Convertir un tema estándar en un tema de importación de Amazon Kinesis Data Streams

Para convertir un tema estándar en un tema de importación de Amazon Kinesis Data Streams, primero debes comprobar que cumples todos los requisitos.

Consola

  1. En la Google Cloud consola, ve a la página Temas.

    Ir a Temas

  2. Haz clic en el tema que quieras convertir en un tema importado.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar ingesta.

  5. En Fuente de ingestión, seleccione Amazon Kinesis Data Streams.

  6. Introduce los siguientes datos:

    • ARN de Kinesis Stream: el ARN del flujo de datos de Kinesis que quieres ingerir en Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN de consumidor de Kinesis: el ARN del recurso de consumidor registrado en el flujo de datos de AWS Kinesis. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • Nombre de recurso de Amazon (ARN) del rol de AWS: el ARN del rol de AWS. El formato ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.

    • Cuenta de servicio: la cuenta de servicio que has creado.

  7. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Haz los cambios siguientes:

    • TOPIC_ID es el ID o el nombre del tema. Este campo no se puede actualizar.

    • KINESIS_STREAM_ARN es el ARN de los flujos de datos de Kinesis que tienes previsto ingerir en Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN es el ARN del recurso de consumidor registrado en AWS Kinesis Data Streams. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN es el ARN del rol de AWS. El formato ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT es la cuenta de servicio que has creado.

  3. Go

    En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    )
    
    func updateTopicType(w io.Writer, projectID, topic string) error {
    	// projectID := "my-project-id"
    	// topic := "projects/my-project-id/topics/my-topic"
    	streamARN := "stream-arn"
    	consumerARN := "consumer-arn"
    	awsRoleARN := "aws-role-arn"
    	gcpServiceAccount := "gcp-service-account"
    
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	pbTopic := &pubsubpb.Topic{
    		Name: topic,
    		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
    			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
    				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
    					StreamArn:         streamARN,
    					ConsumerArn:       consumerARN,
    					AwsRoleArn:        awsRoleARN,
    					GcpServiceAccount: gcpServiceAccount,
    				},
    			},
    		},
    	}
    	updateReq := &pubsubpb.UpdateTopicRequest{
    		Topic: pbTopic,
    		UpdateMask: &fieldmaskpb.FieldMask{
    			Paths: []string{"ingestion_data_source_settings"},
    		},
    	}
    	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
    	if err != nil {
    		return fmt.Errorf("topic.Update: %w", err)
    	}
    	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
    	return nil
    }
    

    Java

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.FieldMask;
    import com.google.pubsub.v1.IngestionDataSourceSettings;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import com.google.pubsub.v1.UpdateTopicRequest;
    import java.io.IOException;
    
    public class UpdateTopicTypeExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
        // Kinesis ingestion settings.
        String streamArn = "stream-arn";
        String consumerArn = "consumer-arn";
        String awsRoleArn = "aws-role-arn";
        String gcpServiceAccount = "gcp-service-account";
    
        UpdateTopicTypeExample.updateTopicTypeExample(
            projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
      }
    
      public static void updateTopicTypeExample(
          String projectId,
          String topicId,
          String streamArn,
          String consumerArn,
          String awsRoleArn,
          String gcpServiceAccount)
          throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
    
          IngestionDataSourceSettings.AwsKinesis awsKinesis =
              IngestionDataSourceSettings.AwsKinesis.newBuilder()
                  .setStreamArn(streamArn)
                  .setConsumerArn(consumerArn)
                  .setAwsRoleArn(awsRoleArn)
                  .setGcpServiceAccount(gcpServiceAccount)
                  .build();
          IngestionDataSourceSettings ingestionDataSourceSettings =
              IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
    
          // Construct the topic with Kinesis ingestion settings.
          Topic topic =
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build();
    
          // Construct a field mask to indicate which field to update in the topic.
          FieldMask updateMask =
              FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();
    
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();
    
          Topic response = topicAdminClient.updateTopic(request);
    
          System.out.println(
              "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
        }
      }
    }

    Node.js

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const awsRoleArn = 'arn:aws:iam:...';
    // const gcpServiceAccount = 'ingestion-account@...';
    // const streamArn = 'arn:aws:kinesis:...';
    // const consumerArn = 'arn:aws:kinesis:...';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function updateTopicIngestionType(
      topicNameOrId,
      awsRoleArn,
      gcpServiceAccount,
      streamArn,
      consumerArn,
    ) {
      const metadata = {
        ingestionDataSourceSettings: {
          awsKinesis: {
            awsRoleArn,
            gcpServiceAccount,
            streamArn,
            consumerArn,
          },
        },
      };
    
      await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
    
      console.log('Topic updated with Kinesis source successfully.');
    }

    Python

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import Topic
    from google.pubsub_v1.types import IngestionDataSourceSettings
    from google.pubsub_v1.types import UpdateTopicRequest
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # stream_arn = "your-stream-arn"
    # consumer_arn = "your-consumer-arn"
    # aws_role_arn = "your-aws-role-arn"
    # gcp_service_account = "your-gcp-service-account"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    update_request = UpdateTopicRequest(
        topic=Topic(
            name=topic_path,
            ingestion_data_source_settings=IngestionDataSourceSettings(
                aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                    stream_arn=stream_arn,
                    consumer_arn=consumer_arn,
                    aws_role_arn=aws_role_arn,
                    gcp_service_account=gcp_service_account,
                )
            ),
        ),
        update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
    )
    
    topic = publisher.update_topic(request=update_request)
    print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")
    
    

    C++

    Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id, std::string stream_arn, std::string consumer_arn,
       std::string aws_role_arn, std::string gcp_service_account) {
      google::pubsub::v1::UpdateTopicRequest request;
    
      request.mutable_topic()->set_name(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      auto* aws_kinesis = request.mutable_topic()
                              ->mutable_ingestion_data_source_settings()
                              ->mutable_aws_kinesis();
      aws_kinesis->set_stream_arn(stream_arn);
      aws_kinesis->set_consumer_arn(consumer_arn);
      aws_kinesis->set_aws_role_arn(aws_role_arn);
      aws_kinesis->set_gcp_service_account(gcp_service_account);
      *request.mutable_update_mask()->add_paths() =
          "ingestion_data_source_settings";
    
      auto topic = client.UpdateTopic(request);
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully updated: " << topic->DebugString()
                << "\n";
    }

    Node.ts

    Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

    /**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    // const awsRoleArn = 'arn:aws:iam:...';
    // const gcpServiceAccount = 'ingestion-account@...';
    // const streamArn = 'arn:aws:kinesis:...';
    // const consumerArn = 'arn:aws:kinesis:...';
    
    // Imports the Google Cloud client library
    import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function updateTopicIngestionType(
      topicNameOrId: string,
      awsRoleArn: string,
      gcpServiceAccount: string,
      streamArn: string,
      consumerArn: string,
    ) {
      const metadata: TopicMetadata = {
        ingestionDataSourceSettings: {
          awsKinesis: {
            awsRoleArn,
            gcpServiceAccount,
            streamArn,
            consumerArn,
          },
        },
      };
    
      await pubSubClient.topic(topicNameOrId).setMetadata(metadata);
    
      console.log('Topic updated with Kinesis source successfully.');
    }

Para obtener más información sobre los ARNs, consulta Nombres de recursos de Amazon (ARNs) e Identificadores de IAM.

Convertir un tema estándar en un tema de importación de Cloud Storage

Para convertir un tema estándar en un tema de importación de Cloud Storage, primero comprueba que cumples todos los requisitos previos.

Consola

  1. En la Google Cloud consola, ve a la página Temas.

    Ir a Temas

  2. Haz clic en el tema que quieras convertir en un tema de importación de Cloud Storage.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar ingesta.

  5. En Fuente de ingestión, selecciona Google Cloud Storage.

  6. En el segmento de Cloud Storage, haga clic en Examinar.

    Se abrirá la página Seleccionar contenedor. Selecciona una de las opciones siguientes:

    • Selecciona un bucket de cualquier proyecto adecuado.

    • Haz clic en el icono de crear y sigue las instrucciones que aparecen en pantalla para crear un nuevo contenedor. Después de crear el segmento, selecciona el segmento del tema de importación de Cloud Storage.

  7. Cuando especifiques el segmento, Pub/Sub comprobará si la cuenta de servicio de Pub/Sub tiene los permisos adecuados en el segmento. Si hay problemas con los permisos, verás un mensaje de error relacionado con ellos.

    Si tienes problemas con los permisos, haz clic en Establecer permisos. Para obtener más información, consulta Conceder permisos de Cloud Storage a la cuenta de servicio de Pub/Sub.

  8. En Formato de objeto, selecciona Texto, Avro o Avro de Pub/Sub.

    Si selecciona Texto, puede especificar un Delimitador para dividir los objetos en mensajes.

    Para obtener más información sobre estas opciones, consulta Formato de entrada.

  9. Opcional. Puede especificar un Tiempo mínimo de creación de objetos para su tema. Si se define, solo se ingieren los objetos creados después del tiempo mínimo de creación de objetos.

    Para obtener más información, consulta Tiempo mínimo de creación de objetos.

  10. Debes especificar un patrón glob. Para ingerir todos los objetos del contenedor, usa ** como patrón glob. Solo se ingieren los objetos que coincidan con el patrón proporcionado.

    Para obtener más información, consulta Coincidir con un patrón glob.

  11. Mantén el resto de los ajustes predeterminados.
  12. Haz clic en Actualizar tema.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para no perder la configuración del tema de importación, asegúrese de incluirla toda cada vez que actualice el tema. Si omites algo, Pub/Sub restablecerá el ajuste a su valor predeterminado original.

    Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Haz los cambios siguientes:

    • TOPIC_ID es el ID o el nombre del tema. Este campo no se puede actualizar.

    • BUCKET_NAME: especifica el nombre de un segmento. Por ejemplo, prod_bucket. El nombre del contenedor no debe incluir el ID del proyecto. Para crear un segmento, consulta el artículo Crear segmentos.

    • INPUT_FORMAT: especifica el formato de los objetos que se ingieren. Puede ser text, avro o pubsub_avro. Para obtener más información sobre estas opciones, consulta Formato de entrada.

    • TEXT_DELIMITER: especifica el delimitador con el que dividir los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo se debe definir cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de nueva línea (\n).

      Cuando uses gcloud CLI para especificar el delimitador, presta atención al tratamiento de caracteres especiales, como el salto de línea \n. Usa el formato '\n' para asegurarte de que el delimitador se interpreta correctamente. Si solo se usa \n sin comillas ni caracteres de escape, se obtiene un delimitador "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica el tiempo mínimo en el que se creó un objeto para que se ingiera. Debe estar en UTC y tener el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

      Cualquier fecha, pasada o futura, desde el 0001-01-01T00:00:00Z hasta el 9999-12-31T23:59:59Z (ambas incluidas) es válida.

    • MATCH_GLOB: especifica el patrón glob que debe coincidir para que se ingiera un objeto. Cuando usas la interfaz de línea de comandos de gcloud, un patrón de coincidencia con caracteres * debe tener el carácter * con formato de escape \*\*.txt o todo el patrón de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida para los patrones glob, consulta la documentación de Cloud Storage.