Cari ke lokasi di langganan Pub/Sub Lite untuk mulai menerima pesan dari lokasi tersebut.
Jelajahi lebih lanjut
Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:
Contoh kode
Go
Untuk melakukan autentikasi ke Pub/Sub Lite, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsublite"
)
// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
// projectID := "my-project-id"
// region := "us-central1"
// zone := "us-central1-a"
// subID := "my-subscription"
// seekTarget := pubsublite.Beginning
// waitForOperation := false
// Possible values for seekTarget:
// - pubsublite.Beginning: replays from the beginning of all retained
// messages.
// - pubsublite.End: skips past all current published messages.
// - pubsublite.PublishTime(<time>): delivers messages with publish time
// greater than or equal to the specified timestamp.
// - pubsublite.EventTime(<time>): seeks to the first message with event
// time greater than or equal to the specified timestamp.
// Waiting for the seek operation to complete is optional. It indicates when
// subscribers for all partitions are receiving messages from the seek
// target. If subscribers are offline, the operation will complete once they
// are online.
ctx := context.Background()
client, err := pubsublite.NewAdminClient(ctx, region)
if err != nil {
return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
}
defer client.Close()
// Initiate an out-of-band seek for a subscription to the specified target.
// If an operation is returned, the seek has been successfully registered
// and will eventually propagate to subscribers.
subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
if err != nil {
return fmt.Errorf("client.SeekSubscription got err: %w", err)
}
fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())
if waitForOperation {
_, err = seekOp.Wait(ctx)
if err != nil {
return fmt.Errorf("seekOp.Wait got err: %w", err)
}
metadata, err := seekOp.Metadata()
if err != nil {
return fmt.Errorf("seekOp.Metadata got err: %w", err)
}
fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
}
return nil
}
Java
Untuk melakukan autentikasi ke Pub/Sub Lite, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;
public class SeekSubscriptionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing subscription.
String subscriptionId = "your-subscription-id";
long projectNumber = Long.parseLong("123456789");
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
// Choose a target location within the message backlog to seek a subscription to.
// Possible values for SeekTarget:
// - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
// messages.
// - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
// - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
// or equal to the specified timestamp.
// - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
// than or equal to the specified timestamp.
SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);
// Optional: Wait for the seek operation to complete, which indicates when subscribers for all
// partitions are receiving messages from the seek target. If subscribers are offline, the
// operation will complete once they are online.
boolean waitForOperation = false;
seekSubscriptionExample(
cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
}
public static void seekSubscriptionExample(
String cloudRegion,
char zoneId,
long projectNumber,
String subscriptionId,
SeekTarget target,
boolean waitForOperation,
boolean regional)
throws Exception {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
.setLocation(location)
.setProject(ProjectNumber.of(projectNumber))
.setName(SubscriptionName.of(subscriptionId))
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
// Initiate an out-of-band seek for a subscription to the specified target. If an operation
// is returned, the seek has been successfully registered and will eventually propagate to
// subscribers.
OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
adminClient.seekSubscription(subscriptionPath, target);
System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");
if (waitForOperation) {
System.out.println("Waiting for operation to complete...");
seekFuture.get();
System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
}
}
}
}
Python
Untuk melakukan autentikasi ke Pub/Sub Lite, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True
# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
# messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
# than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
# greater than or equal to the specified timestamp.
# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
client = AdminClient(cloud_region)
try:
# Initiate an out-of-band seek for a subscription to the specified
# target. If an operation is returned, the seek has been successfully
# registered and will eventually propagate to subscribers.
seek_operation = client.seek_subscription(subscription_path, seek_target)
print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
print(f"{subscription_path} not found.")
return
if wait_for_operation:
print("Waiting for operation to complete...")
seek_operation.result()
print(f"Operation completed. Metadata:\n{seek_operation.metadata}")
Langkah selanjutnya
Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contoh Google Cloud.