借助 OpenTelemetry 跟踪功能,您可以识别和跟踪各种 Pub/Sub 客户端库操作(例如批处理、租约管理和流控制)的延迟时间。收集这些信息有助于您调试客户端库问题。
OpenTelemetry 跟踪的一些可能用例包括:
- 您的服务的发布延迟时间比平常长。
- 您遇到大量消息重新提交的问题。
- 对订阅方客户端的回调函数所做的更改导致处理时间比平时更长。
在配置 OpenTelemetry 之前,请完成以下任务:
- 使用某个客户端库设置 Pub/Sub。
- 安装 OpenTelemetry SDK 并设置跟踪记录导出器和跟踪器提供程序。
- 启用 Cloud Trace API。
- 了解如何读取 Cloud Observability 轨迹。
如需确保服务账号拥有将轨迹导出到 Cloud Trace 所需的权限,请让您的管理员为服务账号授予项目的以下 IAM 角色:
Cloud Trace Agent (
这些预定义角色包含将轨迹导出到 Cloud Trace 所需的权限。如需查看所需的确切权限,请展开所需权限部分:
如需将轨迹导出到 Cloud Trace,需要具备以下权限:
OpenTelemetry 跟踪工作流
如需设置 OpenTelemetry 跟踪,您需要使用 Pub/Sub 客户端库和 OpenTelemetry SDK。使用该 SDK 时,您必须先设置轨迹导出器和轨迹跟踪提供程序,然后才能连接到 Pub/Sub 库。在某些库中,设置跟踪器提供程序是可选的。
Trace 导出器。OpenTelemetry SDK 使用轨迹导出器来确定将轨迹发送到何处。
Tracer 提供程序。Pub/Sub 客户端库使用跟踪器提供程序创建轨迹。
- 实例化 Cloud Trace OpenTelemetry 导出器。
- 根据需要,使用 OpenTelemetery SDK 实例化并注册 Tracer 提供程序。
- 使用“启用 OpenTelemetry 跟踪”选项配置客户端。
- 使用 Pub/Sub 客户端库发布消息。
对于发布的每条消息,客户端库都会创建一条新轨迹。此轨迹表示消息的整个生命周期,从您发布消息到消息被确认的时间。跟踪记录封装了操作时长、父级 span 和子 span 以及关联的 span 等信息。
轨迹由根 span 及其对应的子 span 组成。这些跨度表示客户端库在处理消息时执行的工作。每条消息轨迹都包含以下内容:
- 发布。流控制、有序键调度、批处理和发布 RPC 的长度。
- 对于订阅。并发控制、有序键调度和租约管理。
为了将信息从发布端传播到订阅端,客户端库会在发布端注入跟踪专用属性。仅当跟踪已启用且前面附加了 googclient_
以下代码示例展示了如何使用 Pub/Sub 客户端库和 OpenTelemetry SDK 启用跟踪。在此示例中,系统会将跟踪结果导出到 Cloud Trace。
实例化跟踪器提供程序时,您可以使用 OpenTelemetry SDK 配置采样率。此比率决定了 SDK 应采样的轨迹数量。采样率越低,账单费用就越低,并且可以防止您的服务超出 Cloud Trace 跨度配额。
import (
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing
// enabled, exporting to Cloud Trace.
func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, sampling float64) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
exporter, err := texporter.New(texporter.WithProjectID(projectID),
// Disable spans created by the exporter.
if err != nil {
return fmt.Errorf("error instantiating exporter: %w", err)
resources := resource.NewWithAttributes(
// Instantiate a tracer provider with the following settings
tp := sdktrace.NewTracerProvider(
defer tp.ForceFlush(ctx) // flushes any pending spans
// Create a new client with tracing enabled.
client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
if err != nil {
return fmt.Errorf("pubsub: NewClient: %w", err)
defer client.Close()
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Publishing message with tracing"),
if _, err := result.Get(ctx); err != nil {
return fmt.Errorf("pubsub: result.Get: %w", err)
fmt.Fprintln(w, "Published a traced message")
return nil
// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;
// This example uses a simple wrapper to export (upload) OTel tracing data
// to Google Cloud Trace. More complex applications may use different
// authentication, or configure their own OTel exporter.
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
pubsub::Topic(project_id, topic_id),
// Configure this publisher to enable OTel tracing. Some applications may
// chose to disable tracing in some publishers or to dynamically enable
// this option based on their own configuration.
// After this point, use the Cloud Pub/Sub C++ client library as usual.
// In this example, we will send a few messages and configure a callback
// action for each one.
std::vector<gc::future<void>> ids;
for (int i = 0; i < 5; i++) {
auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
.then([](gc::future<gc::StatusOr<std::string>> f) {
auto id = f.get();
if (!id) {
std::cout << "Error in publish: " << id.status() << "\n";
std::cout << "Sent message with id: (" << *id << ")\n";
// Block until the messages are actually sent.
for (auto& id : ids) id.get();
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions
# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.
sampler = ParentBased(root=TraceIdRatioBased(1))
# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(f"Published messages to {topic_path}.")
* TODO(developer): Uncomment these variables before running the sample.
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";
// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';
// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';
import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
const processor = new SimpleSpanProcessor(exporter);
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function publishMessage(topicNameOrId: string, data: string) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Cache topic objects (publishers) and reuse them.
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
// The rest of the sample is in service to making sure that any
// buffered Pub/Sub messages and/or OpenTelemetry spans are properly
// flushed to the server side. In normal usage, you'd only need to do
// something like this on process shutdown.
await publisher.flush();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
* TODO(developer): Uncomment these variables before running the sample.
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {Resource} = require('@opentelemetry/resources');
const {
} = require('@opentelemetry/semantic-conventions');
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
const processor = new SimpleSpanProcessor(exporter);
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!"
// or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Cache topic objects (publishers) and reuse them.
const publisher = pubSubClient.topic(topicNameOrId);
const messageId = await publisher.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
// The rest of the sample is in service to making sure that any
// buffered Pub/Sub messages and/or OpenTelemetry spans are properly
// flushed to the server side. In normal usage, you'd only need to do
// something like this on process shutdown.
await publisher.flush();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
import com.google.api.core.ApiFuture;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class OpenTelemetryPublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
openTelemetryPublisherExample(projectId, topicId);
public static void openTelemetryPublisherExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Resource resource =
.put(ResourceAttributes.SERVICE_NAME, "publisher-example")
// Creates a Cloud Trace exporter.
SpanExporter traceExporter =
SdkTracerProvider sdkTracerProvider =
OpenTelemetry openTelemetry =
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with the created OpenTelemetry object and enabling tracing.
publisher =
String message = "Hello World!";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published message ID: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.awaitTermination(1, TimeUnit.MINUTES);
import (
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string, sampleRate float64) error {
// projectID := "my-project-id"
// subID := "my-sub"
// sampleRate := "1.0"
ctx := context.Background()
exporter, err := texporter.New(texporter.WithProjectID(projectID),
// Disable spans created by the exporter.
if err != nil {
return fmt.Errorf("error instantiating exporter: %w", err)
resources := resource.NewWithAttributes(
// Instantiate a tracer provider with the following settings
tp := sdktrace.NewTracerProvider(
defer tp.ForceFlush(ctx) // flushes any pending spans
// Create a new client with tracing enabled.
client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
EnableOpenTelemetryTracing: true,
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
defer client.Close()
sub := client.Subscription(subID)
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var received int32
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
atomic.AddInt32(&received, 1)
if err != nil {
return fmt.Errorf("sub.Receive: %w", err)
fmt.Fprintf(w, "Received %d messages\n", received)
return nil
#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include <iostream>
int main(int argc, char* argv[]) try {
if (argc != 4) {
std::cerr << "Usage: " << argv[0]
<< " <project-id> <topic-id> <subscription-id>\n";
return 1;
std::string const project_id = argv[1];
std::string const topic_id = argv[2];
std::string const subscription_id = argv[3];
// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;
auto constexpr kWaitTimeout = std::chrono::seconds(30);
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);
// Publish a message with tracing enabled.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
pubsub::Topic(project_id, topic_id),
// Block until the message is actually sent and throw on error.
auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
std::cout << "Sent message with id: (" << id << ")\n";
// Receive a message using streaming pull with tracing enabled.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(project_id, subscription_id),
auto session =
subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::cout << "Waiting for messages on " + subscription_id + "...\n";
// Blocks until the timeout is reached.
auto result = session.wait_for(kWaitTimeout);
if (result == std::future_status::timeout) {
std::cout << "timeout reached, ending session\n";
return 0;
} catch (google::cloud::Status const& status) {
std::cerr << "google::cloud::Status thrown: " << status << "\n";
return 1;
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions
# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.
sampler = ParentBased(root=TraceIdRatioBased(1))
# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
subscription_project_id, subscription_id
# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
* TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';
// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';
// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';
import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
const processor = new SimpleSpanProcessor(exporter);
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function subscriptionListen(subscriptionNameOrId: string) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);
// Message handler for subscriber
const messageHandler = async (message: Message) => {
console.log(`Message ${message.id} received.`);
// Error handler for subscriber
const errorHandler = async (error: Error) => {
console.log('Received error:', error);
// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);
// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise<void>(r =>
setTimeout(async () => {
await shutdown();
* TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';
// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');
const {Resource} = require('@opentelemetry/resources');
const {
} = require('@opentelemetry/semantic-conventions');
// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();
// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();
// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
resource: new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
const processor = new SimpleSpanProcessor(exporter);
// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});
async function subscriptionListen(subscriptionNameOrId) {
const subscriber = pubSubClient.subscription(subscriptionNameOrId);
// Message handler for subscriber
const messageHandler = async message => {
console.log(`Message ${message.id} received.`);
// Error handler for subscriber
const errorHandler = async error => {
console.log('Received error:', error);
// Listens for new messages from the topic
subscriber.on('message', messageHandler);
subscriber.on('error', errorHandler);
// Ensures that all spans got flushed by the exporter. This function
// is in service to making sure that any buffered Pub/Sub messages
// and/or OpenTelemetry spans are properly flushed to the server
// side. In normal usage, you'd only need to do something like this
// on process shutdown.
async function shutdown() {
await subscriber.close();
await processor.forceFlush();
await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
// Wait a bit for the subscription to receive messages, then shut down
// gracefully. This is for the sample only; normally you would not need
// this delay.
await new Promise(r =>
setTimeout(async () => {
await shutdown();
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class OpenTelemetrySubscriberExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
openTelemetrySubscriberExample(projectId, subscriptionId);
public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) {
Resource resource =
.put(ResourceAttributes.SERVICE_NAME, "subscriber-example")
// Creates a Cloud Trace exporter.
SpanExporter traceExporter =
SdkTracerProvider sdkTracerProvider =
OpenTelemetry openTelemetry =
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
Subscriber subscriber = null;
try {
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
// Start the subscriber.
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
以下部分详细介绍了如何在 Google Cloud 控制台中跟踪和分析轨迹。
- 发布一批消息时,系统会在单独的轨迹中捕获发布 RPC 跨度。
- 发布 RPC 具有多个来源跨度,因为多个创建调用可以批量处理,从而导致发布 RPC。
OpenTelemetry 中的 span 可以有零个或一个父 span。
代表批量操作(例如发布批次,逻辑上应具有多个父级)的 span 不能使用零个或一个父 span 表示。
下图显示了在单个轨迹中针对单个消息创建的 span 示例。
span 属性用于传达其他元数据,例如消息的排序键、消息 ID 和消息大小。
- 您用于导出轨迹的服务账号没有所需的
角色。 - Cloud Trace 中提取的跨度数量已达到上限。
- 您的应用在未调用适当的刷新函数的情况下终止。