Crea un tema Lite de una sola partición y configura su período de retención, por almacenamiento de particiones y por factor de escala de la capacidad de procesamiento de publicación.
Explora más
Para obtener documentación en la que se incluye esta muestra de código, consulta lo siguiente:
Muestra de código
Go
Para autenticarte en Pub/Sub Lite, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsublite"
)
func createTopic(w io.Writer, projectID, region, location, topicID, reservation string) error {
// projectID := "my-project-id"
// region := "us-central1" // see https://cloud.google.com/pubsub/lite/docs/locations
// NOTE: location can be either a region ("us-central1") or a zone ("us-central1-a")
// For a list of valid locations, see https://cloud.google.com/pubsub/lite/docs/locations.
// location := "us-central1"
// topicID := "my-topic"
// reservation := "projects/my-project-id/reservations/my-reservation"
ctx := context.Background()
client, err := pubsublite.NewAdminClient(ctx, region)
if err != nil {
return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
}
defer client.Close()
const gib = 1 << 30
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, location, topicID)
// For ranges of fields in TopicConfig, see https://pkg.go.dev/cloud.google.com/go/pubsublite/#TopicConfig
topic, err := client.CreateTopic(ctx, pubsublite.TopicConfig{
Name: topicPath,
PartitionCount: 2, // Must be >= 1 and cannot decrease after creation.
PublishCapacityMiBPerSec: 4,
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 30 * gib,
RetentionDuration: pubsublite.InfiniteRetention,
ThroughputReservation: reservation,
})
if err != nil {
return fmt.Errorf("client.CreateTopic got err: %w", err)
}
fmt.Fprintf(w, "Created topic: %#v\n", topic)
return nil
}
Java
Para autenticarte en Pub/Sub Lite, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.ReservationName;
import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
import com.google.cloud.pubsublite.proto.Topic.ReservationConfig;
import com.google.cloud.pubsublite.proto.Topic.RetentionConfig;
import com.google.protobuf.util.Durations;
import java.util.concurrent.ExecutionException;
public class CreateTopicExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'a';
String topicId = "your-topic-id";
String reservationId = "your-reservation-id";
long projectNumber = Long.parseLong("123456789");
int partitions = 1;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
createTopicExample(
cloudRegion, zoneId, projectNumber, topicId, reservationId, partitions, regional);
}
public static void createTopicExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
String reservationId,
int partitions,
boolean regional)
throws Exception {
ReservationPath reservationPath =
ReservationPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudRegion.of(cloudRegion))
.setName(ReservationName.of(reservationId))
.build();
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Topic topic =
Topic.newBuilder()
.setPartitionConfig(
PartitionConfig.newBuilder()
// Set throughput capacity per partition in MiB/s.
.setCapacity(
Capacity.newBuilder()
// Must be 4-16 MiB/s.
.setPublishMibPerSec(4)
// Must be 4-32 MiB/s.
.setSubscribeMibPerSec(8)
.build())
.setCount(partitions))
.setRetentionConfig(
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setReservationConfig(
ReservationConfig.newBuilder()
.setThroughputReservation(reservationPath.toString())
.build())
.setName(topicPath.toString())
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Topic response = adminClient.createTopic(topic).get();
if (regional) {
System.out.println(response.getAllFields() + " (regional topic) created successfully.");
} else {
System.out.println(response.getAllFields() + " (zonal topic) created successfully.");
}
} catch (ExecutionException e) {
try {
throw e.getCause();
} catch (AlreadyExistsException alreadyExists) {
System.out.println("This topic already exists.");
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
Python
Para autenticarte en Pub/Sub Lite, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsublite import AdminClient, Topic
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
ReservationPath,
TopicPath,
)
from google.protobuf.duration_pb2 import Duration
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# reservation_id = "your-reservation-id"
# num_partitions = 1
# regional = True
cloud_region = CloudRegion(cloud_region)
reservation_path = ReservationPath(project_number, cloud_region, reservation_id)
topic_path = None
if regional:
# A regional topic.
topic_path = TopicPath(project_number, cloud_region, topic_id)
else:
# A zonal topic
topic_path = TopicPath(
project_number, CloudZone(cloud_region, zone_id), topic_id
)
topic = Topic(
name=str(topic_path),
partition_config=Topic.PartitionConfig(
# A topic must have at least one partition.
count=num_partitions,
# Set throughput capacity per partition in MiB/s.
capacity=Topic.PartitionConfig.Capacity(
# Set publish throughput capacity per partition to 4 MiB/s. Must be >= 4 and <= 16.
publish_mib_per_sec=4,
# Set subscribe throughput capacity per partition to 4 MiB/s. Must be >= 4 and <= 32.
subscribe_mib_per_sec=8,
),
),
retention_config=Topic.RetentionConfig(
# Set storage per partition to 30 GiB. This must be in the range 30 GiB-10TiB.
# If the number of byptes stored in any of the topic's partitions grows beyond
# this value, older messages will be dropped to make room for newer ones,
# regardless of the value of `period`.
per_partition_bytes=30 * 1024 * 1024 * 1024,
# Allow messages to be retained for 7 days.
period=Duration(seconds=60 * 60 * 24 * 7),
),
reservation_config=Topic.ReservationConfig(
throughput_reservation=str(reservation_path),
),
)
client = AdminClient(cloud_region)
try:
response = client.create_topic(topic)
if regional:
print(f"{response.name} (regional topic) created successfully.")
else:
print(f"{response.name} (zonal topic) created successfully.")
except AlreadyExists:
print(f"{topic_path} already exists.")
¿Qué sigue?
Para buscar y filtrar muestras de código de otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.