Cambia el tipo de tema

Puedes convertir un tema de importación en uno estándar o viceversa.

Cómo convertir un tema de importación en uno estándar

Para convertir un tema de importación en uno estándar, borra la configuración de transferencia. Sigue los siguientes pasos:

Console

  1. En la consola de Google Cloud , ve a la página Topics.

    Ir a temas

  2. Haz clic en el tema de importación.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Desmarca la opción Habilitar transferencia.

  5. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Reemplaza TOPIC_ID por el ID del tema.

Convierte un tema estándar en un tema de importación de Amazon Kinesis Data Streams

Para convertir un tema estándar en un tema de importación de Amazon Kinesis Data Streams, primero verifica que cumplas con todos los requisitos previos.

Console

  1. En la consola de Google Cloud , ve a la página Topics.

    Ir a temas

  2. Haz clic en el tema que quieres convertir en un tema de importación.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar transferencia.

  5. En Fuente de transferencia, selecciona Amazon Kinesis Data Streams.

  6. Ingresa los siguientes detalles:

    • ARN de transmisión de Kinesis: Es el ARN del flujo de datos de Kinesis que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN del consumidor de Kinesis: Es el ARN del recurso de consumidor que se registró en el flujo de datos de AWS Kinesis. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN de rol de AWS: Es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam:${Account}:role/${RoleName}.

    • Cuenta de servicio: Es la cuenta de servicio que creaste en Cómo crear una cuenta de servicio en Google Cloud.

  7. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en la siguiente muestra:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema. No se puede actualizar este campo.

    • KINESIS_STREAM_ARN es el ARN de los flujos de datos de Kinesis que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN es el ARN del recurso del consumidor que se registró en AWS Kinesis Data Streams. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT es la cuenta de servicio que creaste en Cómo crear una cuenta de servicio en Google Cloud.

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Go de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub mediante bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Para obtener más información sobre los ARN, consulta Nombres de recursos de Amazon (ARN) y Identificadores de IAM.

Convierte un tema estándar en un tema de importación de Cloud Storage

Para convertir un tema estándar en un tema de importación de Cloud Storage, primero verifica que cumplas con todos los requisitos previos.

Console

  1. En la consola de Google Cloud , ve a la página Topics.

    Ir a temas

  2. Haz clic en el tema que deseas convertir en un tema de importación de Cloud Storage.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar transferencia.

  5. En Fuente de transferencia, selecciona Google Cloud Storage.

  6. En el bucket de Cloud Storage, haz clic en Explorar.

    Se abrirá la página Seleccionar bucket. Selecciona una de las siguientes opciones:

    • Selecciona un bucket existente de cualquier proyecto adecuado.

    • Haz clic en el ícono de crear y sigue las instrucciones en pantalla para crear un bucket nuevo. Después de crear el bucket, selecciónalo para el tema de importación de Cloud Storage.

  7. Cuando especificas el bucket, Pub/Sub busca los permisos adecuados en el bucket para la cuenta de servicio de Pub/Sub. Si hay problemas de permisos, verás un mensaje de error relacionado con ellos.

    Si tienes problemas de permisos, haz clic en Establecer permisos. Para obtener más información, consulta Otorga permisos de Cloud Storage a la cuenta de servicio de Pub/Sub.

  8. En Formato de objeto, selecciona Texto, Avro o Avro de Pub/Sub.

    Si seleccionas Texto, puedes especificar de manera opcional un Delimitador con el que dividir los objetos en mensajes.

    Para obtener más información sobre estas opciones, consulta Formato de entrada.

  9. Opcional. Puedes especificar un tiempo mínimo de creación de objetos para tu tema. Si se establece, solo se transferirán los objetos creados después del tiempo mínimo de creación del objeto.

    Para obtener más información, consulta Tiempo mínimo de creación del objeto.

  10. Debes especificar un patrón glob. Para transferir todos los objetos del bucket, usa ** como patrón glob. Solo se transfieren los objetos que coinciden con el patrón determinado.

    Para obtener más información, consulta Cómo hacer coincidir un patrón de glob.

  11. Mantén la configuración predeterminada.
  12. Haz clic en Actualizar tema.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para evitar perder la configuración del tema de importación, asegúrate de incluirlos todos cada vez que lo actualices. Si omites algo, Pub/Sub restablecerá la configuración a su valor predeterminado original.

    Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en la siguiente muestra:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema. No se puede actualizar este campo.

    • BUCKET_NAME: Especifica el nombre de un bucket existente. Por ejemplo, prod_bucket El nombre del bucket no debe incluir el ID del proyecto. Para crear un bucket, consulta Crea buckets.

    • INPUT_FORMAT: Especifica el formato de los objetos que se transfieren. Puede ser text, avro o pubsub_avro. Para obtener más información sobre estas opciones, consulta Formato de entrada.

    • TEXT_DELIMITER: Especifica el delimitador con el que se dividirán los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo se debe establecer cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de salto de línea (\n).

      Cuando uses gcloud CLI para especificar el delimitador, presta mucha atención a la manipulación de caracteres especiales, como el carácter de línea nueva \n. Usa el formato '\n' para asegurarte de que el delimitador se interprete correctamente. Simplemente, usar \n sin comillas o sin escapar los resultados genera un delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: Especifica el tiempo mínimo en el que se creó un objeto para que se transfiera. Debe estar en UTC con el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

      Cualquier fecha, pasada o futura, del 0001-01-01T00:00:00Z al 9999-12-31T23:59:59Z inclusive, es válida.

    • MATCH_GLOB: Especifica el patrón glob que debe coincidir para que se transfiera un objeto. Cuando usas gcloud CLI, un glob de coincidencia con caracteres * debe tener el carácter * con formato de escape en el formato \*\*.txt o todo el glob de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida en los patrones glob, consulta la documentación de Cloud Storage.