Crear un tema de importación de Cloud Storage

Un tema de importación de Cloud Storage te permite ingerir datos de Cloud Storage en Pub/Sub de forma continua. Después, puedes transmitir los datos a cualquiera de los destinos que admite Pub/Sub. Pub/Sub detecta automáticamente los nuevos objetos que se añaden al segmento de Cloud Storage y los ingiere.

Cloud Storage es un servicio para almacenar tus objetos en Google Cloud. Un objeto es un fragmento de datos que no se puede modificar y que consta de un archivo en cualquier formato. Los objetos se almacenan en contenedores llamados segmentos. Los contenedores también pueden incluir carpetas gestionadas, que se usan para proporcionar acceso ampliado a grupos de objetos con un prefijo de nombre compartido.

Para obtener más información sobre Cloud Storage, consulta la documentación de Cloud Storage.

Para obtener más información sobre los temas de importación, consulta el artículo Acerca de los temas de importación.

Antes de empezar

Roles y permisos necesarios

Para obtener los permisos que necesitas para crear y gestionar un tema de importación de Cloud Storage, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.editor) en tu tema o proyecto. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear y gestionar un tema de importación de Cloud Storage. Para ver los permisos exactos que se necesitan, despliega la sección Permisos necesarios:

Permisos obligatorios

Para crear y gestionar un tema de importación de Cloud Storage, se necesitan los siguientes permisos:

  • Crea un tema de importación: pubsub.topics.create
  • Eliminar un tema importado: pubsub.topics.delete
  • Obtener un tema de importación: pubsub.topics.get
  • Para enumerar un tema de importación, sigue estos pasos: pubsub.topics.list
  • Publicar en un tema de importación: pubsub.topics.publish
  • Actualizar un tema de importación: pubsub.topics.update
  • Obtiene la política de gestión de identidades y accesos de un tema de importación: pubsub.topics.getIamPolicy
  • Configura la política de gestión de identidades y accesos de un tema de importación: pubsub.topics.setIamPolicy

También puedes obtener estos permisos con roles personalizados u otros roles predefinidos.

Puedes configurar el control de acceso a nivel de proyecto y de recurso individual.

La política de almacenamiento de mensajes cumple los requisitos de la ubicación del contenedor

La política de almacenamiento de mensajes del tema de Pub/Sub debe coincidir con las regiones en las que se encuentra tu segmento de Cloud Storage. Esta política indica dónde puede almacenar Pub/Sub los datos de tus mensajes.

  • En el caso de los contenedores cuyo tipo de ubicación sea una región, la política debe incluir esa región específica. Por ejemplo, si tu segmento está en la región us-central1, la política de almacenamiento de mensajes también debe incluir us-central1.

  • En el caso de los segmentos con el tipo de ubicación birregional o multirregional, la política debe incluir al menos una región dentro de la ubicación birregional o multirregional. Por ejemplo, si tu segmento está en US multi-region, la política de almacenamiento de mensajes podría incluir us-central1, us-east1 o cualquier otra región de US multi-region.

    Si la política no incluye la región del bucket, no se podrá crear el tema. Por ejemplo, si tu contenedor está en europe-west1 y tu política de almacenamiento de mensajes solo incluye asia-east1, recibirás un error.

    Si la política de almacenamiento de mensajes incluye solo una región que se solapa con la ubicación del contenedor, la redundancia multirregional podría verse comprometida. Esto se debe a que, si esa región deja de estar disponible, es posible que no puedas acceder a tus datos. Para garantizar una redundancia total, se recomienda incluir al menos dos regiones en la política de almacenamiento de mensajes que formen parte de la ubicación multirregional o birregional del segmento.

Para obtener más información sobre las ubicaciones de los segmentos, consulta la documentación.

Habilitar la publicación

Para habilitar la publicación, debes asignar el rol de publicador de Pub/Sub a la cuenta de servicio de Pub/Sub para que Pub/Sub pueda publicar en el tema de importación de Cloud Storage.

Habilitar la publicación en todos los temas de importación de Cloud Storage

Elige esta opción si no tienes ningún tema de importación de Cloud Storage disponible en tu proyecto.

  1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

    Ir a IAM

  2. Selecciona la casilla Incluir concesiones de roles proporcionadas por Google.

  3. Busca la cuenta de servicio de Pub/Sub que tenga el siguiente formato:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Añadir otro rol.

  6. Busca y selecciona el rol de editor de Pub/Sub (roles/pubsub.publisher).

  7. Haz clic en Guardar.

Habilitar la publicación en un único tema de importación de Cloud Storage

Si quiere conceder a Pub/Sub el permiso para publicar en un tema de importación de Cloud Storage específico que ya exista, siga estos pasos:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Haz los cambios siguientes:

    • TOPIC_ID es el ID o el nombre del tema de importación de Cloud Storage.

    • PROJECT_NUMBER es el número de proyecto. Para ver el número de proyecto, consulta el artículo Identificar proyectos.

  3. Asigna roles de Cloud Storage a la cuenta de servicio de Pub/Sub

    Para crear un tema de importación de Cloud Storage, la cuenta de servicio de Pub/Sub debe tener permiso para leer del segmento de Cloud Storage específico. Son obligatorios los siguientes permisos:

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    Para asignar estos permisos a la cuenta de servicio de Pub/Sub, elige uno de los siguientes procedimientos:

    • Conceder permisos a nivel de segmento. En el segmento de Cloud Storage específico, asigna los roles Lector de objetos antiguos de Storage (roles/storage.legacyObjectReader) y Lector de segmentos antiguos de Storage (roles/storage.legacyBucketReader) a la cuenta de servicio de Pub/Sub.

    • Si debes asignar roles a nivel de proyecto, puedes asignar el rol Administrador de Storage (roles/storage.admin) en el proyecto que contiene el segmento de Cloud Storage. Asigna este rol a la cuenta de servicio de Pub/Sub.

    Permisos de segmento

    Sigue estos pasos para asignar los roles Lector de objetos antiguos de Storage (roles/storage.legacyObjectReader) y Lector de segmentos antiguos de Storage (roles/storage.legacyBucketReader) a la cuenta de servicio de Pub/Sub a nivel de segmento:

    1. En la Google Cloud consola, ve a la página Cloud Storage.

      Ir a Cloud Storage

    2. Haga clic en el segmento de Cloud Storage desde el que quiera leer mensajes e importarlos al tema de importación de Cloud Storage.

      Se abrirá la página Detalles del segmento.

    3. En la página Detalles del segmento, haga clic en la pestaña Permisos.

    4. En la pestaña Permisos > Ver por principales, haz clic en Dar acceso.

      Se abrirá la página Dar acceso.

    5. En la sección Añadir principales, introduce el nombre de tu cuenta de servicio de Pub/Sub.

      El formato de la cuenta de servicio es service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por ejemplo, en un proyecto con PROJECT_NUMBER=112233445566, la cuenta de servicio tiene el formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

    6. En Asignar roles > Seleccionar un rol, introduce Object Reader y selecciona el rol Lector de objetos antiguos de Storage.

    7. Haz clic en Añadir otro rol.

    8. En el menú desplegable Selecciona un rol, introduce Bucket Reader y selecciona el rol Lector de segmentos heredados de Storage.

    9. Haz clic en Guardar.

    Permisos de proyecto

    Sigue estos pasos para conceder el rol Administrador de Storage (roles/storage.admin) a nivel de proyecto:

    1. En la consola, ve a la página Gestión de identidades y accesos. Google Cloud

      Ir a IAM

    2. En la pestaña Permisos > Ver por principales, haz clic en Dar acceso.

      Se abrirá la página Dar acceso.

    3. En la sección Añadir principales, introduce el nombre de tu cuenta de servicio de Pub/Sub.

      El formato de la cuenta de servicio es service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por ejemplo, en un proyecto con PROJECT_NUMBER=112233445566, la cuenta de servicio tiene el formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

    4. En el menú desplegable Asignar roles > Seleccionar un rol, introduce Storage Admin y selecciona el rol Administrador de almacenamiento.

    5. Haz clic en Guardar.

    Para obtener más información sobre la gestión de identidades y accesos de Cloud Storage, consulta el artículo Gestión de identidades y accesos de Cloud Storage.

    Propiedades de los temas de importación de Cloud Storage

    Para obtener más información sobre las propiedades comunes de todos los temas, consulta Propiedades de un tema.

    Nombre del segmento

    Es el nombre del segmento de Cloud Storage desde el que Pub/Sub lee los datos que se publican en un tema de importación de Cloud Storage.

    Formato de entrada

    Cuando creas un tema de importación de Cloud Storage, puedes especificar el formato de los objetos que se van a ingerir como Texto, Avro o Avro de Pub/Sub.

    • Texto. Se presupone que los objetos contienen datos con texto sin formato. Este formato de entrada intenta ingerir todos los objetos del segmento siempre que el objeto cumpla el tiempo mínimo de creación de objetos y coincida con los criterios de patrón glob.

      Delimitador. También puedes especificar un delimitador por el que se dividan los objetos en mensajes. Si no se define, el valor predeterminado es el carácter de salto de línea (\n). El delimitador debe ser un solo carácter.

    • Avro. Los objetos están en formato binario Apache Avro. No se ingiere ningún objeto que no tenga un formato Apache Avro válido. Estas son las limitaciones relativas a Avro:

      • Las versiones 1.1.0 y 1.2.0 de Avro no son compatibles.
      • El tamaño máximo de un bloque Avro es de 16 MB.
    • Avro de Pub/Sub. Los objetos están en formato binario Apache Avro con un esquema que coincide con el de un objeto escrito en Cloud Storage mediante una suscripción de Cloud Storage de Pub/Sub con el formato de archivo Avro. A continuación, se indican algunas directrices importantes para usar Avro en Pub/Sub:

      • El campo de datos del registro Avro se usa para rellenar el campo de datos del mensaje de Pub/Sub generado.

      • Si se especifica la opción write_metadata para la suscripción de Cloud Storage, los valores del campo attributes se rellenan como atributos del mensaje de Pub/Sub generado.

      • Si se especifica una clave de ordenación en el mensaje original escrito en Cloud Storage, este campo se rellena como un atributo con el nombre original_message_ordering_key en el mensaje de Pub/Sub generado.

    Tiempo mínimo de creación de objetos

    Cuando crees un tema de importación de Cloud Storage, puedes especificar un tiempo mínimo de creación de objetos. Solo se ingieren los objetos que se hayan creado en esta marca de tiempo o después. Esta marca de tiempo debe proporcionarse en un formato como YYYY-MM-DDThh:mm:ssZ. Cualquier fecha, pasada o futura, desde el 0001-01-01T00:00:00Z hasta el 9999-12-31T23:59:59Z (ambas incluidas), es válida.

    Coincidencia con patrón glob

    Cuando creas un tema de importación de Cloud Storage, puedes especificar un patrón de glob de coincidencia. Solo se ingieren los objetos cuyos nombres coincidan con este patrón. Por ejemplo, para ingerir todos los objetos con el sufijo .txt, puedes especificar el patrón glob **.txt.

    Para obtener información sobre la sintaxis admitida de los patrones glob, consulta la documentación de Cloud Storage.

    Usar temas de importación de Cloud Storage

    Puede crear un tema de importación o editar uno que ya tenga.

    Cuestiones importantes

    • Si creas el tema y la suscripción por separado, aunque lo hagas de forma rápida, se pueden perder datos. Hay un breve periodo en el que el tema está disponible sin suscripción. Si se envían datos al tema durante este periodo, se perderán. Si creas el tema primero, luego la suscripción y, por último, conviertes el tema en un tema de importación, te aseguras de que no se pierda ningún mensaje durante el proceso de importación.

    Crear un tema de importación de Cloud Storage

    Para crear un tema de importación de Cloud Storage, sigue estos pasos:

    Consola

    1. En la Google Cloud consola, ve a la página Temas.

      Ir a Temas

    2. Haz clic en Crear tema.

      Se abrirá la página de detalles del tema.

    3. En el campo ID de tema, introduce un ID para el tema de importación de Cloud Storage.

      Para obtener más información sobre cómo poner nombre a los temas, consulta las directrices de nomenclatura.

    4. Selecciona Añadir una suscripción predeterminada.

    5. Selecciona Habilitar la ingesta.

    6. En Fuente de ingestión, selecciona Google Cloud Storage.

    7. En el segmento de Cloud Storage, haga clic en Examinar.

      Se abrirá la página Seleccionar contenedor. Selecciona una de las opciones siguientes:

      • Selecciona un bucket de cualquier proyecto adecuado.

      • Haz clic en el icono de crear y sigue las instrucciones que aparecen en pantalla para crear un nuevo contenedor. Después de crear el segmento, selecciónalo para el tema de importación de Cloud Storage.

    8. Cuando especifiques el segmento, Pub/Sub comprobará que la cuenta de servicio de Pub/Sub tiene los permisos adecuados en el segmento. Si hay problemas con los permisos, verás un mensaje similar al siguiente:

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      Si tienes problemas con los permisos, haz clic en Establecer permisos. Para obtener más información, consulta Conceder permisos de Cloud Storage a la cuenta de servicio de Pub/Sub.

    9. En Formato de objeto, selecciona Texto, Avro o Avro de Pub/Sub.

      Si selecciona Texto, puede especificar un Delimitador para dividir los objetos en mensajes.

      Para obtener más información sobre estas opciones, consulta Formato de entrada.

    10. Opcional. Puede especificar un tiempo mínimo de creación de objetos para su tema. Si se define, solo se ingieren los objetos creados después de la hora mínima de creación de objetos.

      Para obtener más información, consulta Tiempo mínimo de creación de objetos.

    11. Debes especificar un patrón glob. Para ingerir todos los objetos del contenedor, usa ** como patrón glob. Si se define, solo se insertarán los objetos que coincidan con el patrón indicado.

      Para obtener más información, consulta Buscar coincidencias con un patrón glob.

    12. Mantén el resto de los ajustes predeterminados.

    13. Haz clic en Crear tema.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Ejecuta el comando gcloud pubsub topics create:

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      En el comando, solo son obligatorios TOPIC_ID, la marca --cloud-storage-ingestion-bucket y la marca --cloud-storage-ingestion-input-format. Las demás marcas son opcionales y se pueden omitir.

      Haz los cambios siguientes:

      • TOPIC_ID: el nombre o el ID del tema.
      • BUCKET_NAME: especifica el nombre de un segmento. Por ejemplo, prod_bucket. El nombre del contenedor no debe incluir el ID del proyecto. Para crear un segmento, consulta el artículo Crear segmentos.
      • INPUT_FORMAT: especifica el formato de los objetos que se ingieren. Puede ser text, avro o pubsub_avro. Para obtener más información sobre estas opciones, consulta Formato de entrada.
      • TEXT_DELIMITER: especifica el delimitador con el que dividir los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo se debe definir cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de nueva línea (\n).

        Cuando uses gcloud CLI para especificar el delimitador, presta atención al tratamiento de caracteres especiales, como el salto de línea \n. Usa el formato '\n' para asegurarte de que el delimitador se interpreta correctamente. Si solo se usa \n sin comillas ni caracteres de escape, se obtiene un delimitador "n".

      • MINIMUM_OBJECT_CREATE_TIME: especifica el tiempo mínimo en el que se creó un objeto para que se ingiera. Debe estar en UTC y tener el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

        Cualquier fecha, pasada o futura, desde el 0001-01-01T00:00:00Z hasta el 9999-12-31T23:59:59Z (ambas incluidas) es válida.

      • MATCH_GLOB: especifica el patrón glob que debe coincidir para que se ingiera un objeto. Cuando usas la interfaz de línea de comandos de gcloud, un patrón de coincidencia con caracteres * debe tener el carácter * con formato de escape \*\*.txt o todo el patrón de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida para los patrones glob, consulta la documentación de Cloud Storage.

    3. C++

      Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    Si tienes problemas, consulta el artículo Solucionar problemas con la importación de Cloud Storage.

    Editar un tema de importación de Cloud Storage

    Puedes editar un tema de importación de Cloud Storage para actualizar sus propiedades.

    Por ejemplo, para reiniciar la ingestión, puedes cambiar el bucket o actualizar el tiempo mínimo de creación de objetos.

    Para editar un tema de importación de Cloud Storage, siga estos pasos:

    Consola

    1. En la Google Cloud consola, ve a la página Temas.

      Ir a Temas

    2. Haga clic en el tema de importación de Cloud Storage.

    3. En la página de detalles del tema, haz clic en Editar.

    4. Actualiza los campos que quieras cambiar.

    5. Haz clic en Actualizar.

    gcloud

    1. In the Google Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Para no perder la configuración del tema de importación, asegúrese de incluirla toda cada vez que actualice el tema. Si omites algo, Pub/Sub restablecerá el ajuste a su valor predeterminado original.

      Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      Haz los cambios siguientes:

      • TOPIC_ID es el ID o el nombre del tema. Este campo no se puede actualizar.

      • BUCKET_NAME: especifica el nombre de un segmento. Por ejemplo, prod_bucket. El nombre del contenedor no debe incluir el ID del proyecto. Para crear un segmento, consulta el artículo Crear segmentos.

      • INPUT_FORMAT: especifica el formato de los objetos que se ingieren. Puede ser text, avro o pubsub_avro. Consulta Formato de entrada para obtener más información sobre estas opciones.

      • TEXT_DELIMITER: especifica el delimitador con el que dividir los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo se debe definir cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de nueva línea (\n).

        Cuando uses gcloud CLI para especificar el delimitador, presta atención al tratamiento de caracteres especiales, como el salto de línea \n. Usa el formato '\n' para asegurarte de que el delimitador se interpreta correctamente. Si solo se usa \n sin comillas ni caracteres de escape, se obtiene un delimitador "n".

      • MINIMUM_OBJECT_CREATE_TIME: especifica el tiempo mínimo en el que se creó un objeto para que se ingiera. Debe estar en UTC y tener el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

        Cualquier fecha, pasada o futura, desde el 0001-01-01T00:00:00Z hasta el 9999-12-31T23:59:59Z (ambas incluidas) es válida.

      • MATCH_GLOB: especifica el patrón glob que debe coincidir para que se ingiera un objeto. Cuando usas la interfaz de línea de comandos de gcloud, un patrón de coincidencia con caracteres * debe tener el carácter * con formato de escape \*\*.txt o todo el patrón de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida de los patrones glob, consulta la documentación de Cloud Storage.

    Cuotas y límites de los temas de importación de Cloud Storage

    El rendimiento de publicación de los temas de importación está limitado por la cuota de publicación del tema. Para obtener más información, consulta las cuotas y los límites de Pub/Sub.

    Siguientes pasos