Pub/Sub OpenTelemetry-Tracing

Mit OpenTelemetry-Tracing können Sie die Latenz verschiedener Vorgänge der Pub/Sub-Clientbibliotheken wie Batchverarbeitung, Freigabe und Ablaufsteuerung. Diese Informationen können dir bei der Fehlerbehebung von Problemen mit der Clientbibliothek helfen.

Hier sind einige mögliche Anwendungsfälle für OpenTelemetry-Tracing:

  • Die Veröffentlichungslatenz für Ihren Dienst ist höher als normal.
  • Es kommt zu einer hohen Anzahl von Nachrichtenwiederbestellungen.
  • Eine Änderung an der Callback-Funktion deines Abonnentenclients führt dazu, dass die Verarbeitung länger als gewöhnlich dauert.

Hinweis

Führen Sie die folgenden Aufgaben aus, bevor Sie OpenTelemetry konfigurieren:

Erforderliche Rollen

Damit das Dienstkonto die erforderlichen Berechtigungen zum Exportieren von Protokollen in Cloud Trace hat, bitten Sie Ihren Administrator, dem Dienstkonto die folgenden IAM-Rollen für Ihr Projekt zu erteilen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierten Rollen enthalten Berechtigungen zum Exportieren von Traces nach Cloud Trace Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um Traces nach Cloud Trace zu exportieren:

  • Alle: cloudtrace.traces.patch

Ihr Administrator kann dem Dienstkonto möglicherweise auch diese Berechtigungen mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erteilen.

OpenTelemetry-Tracing-Workflow

Zum Einrichten von OpenTelemetry-Tracing verwenden Sie die Pub/Sub-Clientbibliotheken und das OpenTelemetry SDK. Mit dem SDK müssen Sie einen Trace-Exporteur und einen Tracer-Anbieter einrichten, bevor Sie eine Verbindung zu den Pub/Sub-Bibliotheken herstellen können. In einigen Bibliotheken ist die Einrichtung eines Tracer-Anbieters optional.

  • Trace-Exporter Das OpenTelemetry SDK verwendet den Trace-Exporter, um zu bestimmen, wohin Traces gesendet werden.

  • Tracer-Anbieter Die Pub/Sub-Clientbibliotheken verwenden den Tracer. Anbieter, um Traces zu erstellen.

Mit den folgenden Schritten können Sie die Aufrufabfolge einrichten:

  1. Instanziieren Sie einen Cloud Trace OpenTelemetry-Exporter.
  2. Erstellen und registrieren Sie bei Bedarf einen Tracer-Anbieter mit dem OpenTelemetry SDK.
  3. Konfigurieren Sie den Client mit der Option zum Aktivieren des OpenTelemetry-Tracings.
  4. Verwende die Pub/Sub-Clientbibliotheken, um eine Nachricht zu veröffentlichen.

So funktioniert das Tracing

Für jede veröffentlichte Nachricht erstellt die Clientbibliothek eine neue Spur. Dieser Trace stellt den gesamten Lebenszyklus der Nachricht dar, ab dem Moment, in dem Sie eine an, wenn die Nachricht bestätigt wurde. Ein Trace enthält Informationen wie die Dauer von Vorgängen, übergeordnete und untergeordnete Spans sowie verknüpfte Spans.

Ein Trace besteht aus einem Stamm-Span und den zugehörigen untergeordneten Spans. Diese Spannen repräsentieren die Arbeit, die die Clientbibliothek bei der Verarbeitung einer Nachricht leistet. Jedes Nachrichten-Trace enthält Folgendes:

  • Veröffentlichung: Ablaufsteuerung, Schlüsselplanung, Batching und Länge des RPCs zur Veröffentlichung.
  • Für Abos Nebenläufigkeitskontrolle, das Bestellen von Schlüsselplanung und Leasingverwaltung.

Um Informationen von der Veröffentlichungsseite an die Aboseite weiterzuleiten, fügen die Clientbibliotheken ein spezifisches Attribut für die Nachverfolgung auf der Veröffentlichungsseite ein. Der Kontextübertragungsmechanismus ist nur aktiviert, wenn das Tracing aktiviert ist und das Präfix googclient_ vorangestellt ist.

Nachrichten mit Tracing veröffentlichen

Im folgenden Codebeispiel wird gezeigt, wie Sie die Tracing-Funktion mit der Pub/Sub-Clientbibliothek und dem OpenTelemetry SDK aktivieren. In diesem Beispiel werden die Ergebnisse der Aufrufabfolge nach Cloud Trace exportiert.

Hinweise

Beim Instanziieren des Tracer-Anbieters konfigurieren Sie eine Stichprobenrate. mit dem OpenTelemetry SDK. Anhand dieses Verhältnisses wird festgelegt, wie viele Traces das SDK erfassen soll. Eine niedrigere Rate für die Stichprobenerhebung kann dazu beitragen, die Abrechnungskosten zu senken und Ihr Dienst den Cloud Trace-Span überschreitet Kontingent.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"go.opentelemetry.io/otel"
	"google.golang.org/api/option"

	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing
// enabled, exporting to Cloud Trace.
func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, sampling float64) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("publisher"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampling)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Publishing message with tracing"),
	})
	if _, err := result.Get(ctx); err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintln(w, "Published a traced message")
	return nil
}

C++

// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;

// This example uses a simple wrapper to export (upload) OTel tracing data
// to Google Cloud Trace. More complex applications may use different
// authentication, or configure their own OTel exporter.
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);

auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
    pubsub::Topic(project_id, topic_id),
    // Configure this publisher to enable OTel tracing. Some applications may
    // chose to disable tracing in some publishers or to dynamically enable
    // this option based on their own configuration.
    gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

// After this point, use the Cloud Pub/Sub C++ client library as usual.
// In this example, we will send a few messages and configure a callback
// action for each one.
std::vector<gc::future<void>> ids;
for (int i = 0; i < 5; i++) {
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .then([](gc::future<gc::StatusOr<std::string>> f) {
                  auto id = f.get();
                  if (!id) {
                    std::cout << "Error in publish: " << id.status() << "\n";
                    return;
                  }
                  std::cout << "Sent message with id: (" << *id << ")\n";
                });
  ids.push_back(std::move(id));
}
// Block until the messages are actually sent.
for (auto& id : ids) id.get();

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.


from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions

# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)

# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
    publisher_options=PublisherOptions(
        enable_open_telemetry_tracing=True,
    ),
)

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const publisher = pubSubClient.topic(topicNameOrId);
  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  const publisher = pubSubClient.topic(topicNameOrId);
  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

Java


import com.google.api.core.ApiFuture;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryPublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    openTelemetryPublisherExample(projectId, topicId);
  }

  public static void openTelemetryPublisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "publisher-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with the created OpenTelemetry object and enabling tracing.
      publisher =
          Publisher.newBuilder(topicName)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Nachrichten mit Rückverfolgung empfangen

Go

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
	"google.golang.org/api/option"
)

func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string, sampleRate float64) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// sampleRate := "1.0"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("subscriber"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

C++

#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 4) {
    std::cerr << "Usage: " << argv[0]
              << " <project-id> <topic-id> <subscription-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const topic_id = argv[2];
  std::string const subscription_id = argv[3];

  // Create a few namespace aliases to make the code easier to read.
  namespace gc = ::google::cloud;
  namespace otel = gc::otel;
  namespace pubsub = gc::pubsub;

  auto constexpr kWaitTimeout = std::chrono::seconds(30);

  auto project = gc::Project(project_id);
  auto configuration = otel::ConfigureBasicTracing(project);

  // Publish a message with tracing enabled.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      pubsub::Topic(project_id, topic_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
  // Block until the message is actually sent and throw on error.
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .get()
                .value();
  std::cout << "Sent message with id: (" << id << ")\n";

  // Receive a message using streaming pull with tracing enabled.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(project_id, subscription_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

  auto session =
      subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
      });

  std::cout << "Waiting for messages on " + subscription_id + "...\n";

  // Blocks until the timeout is reached.
  auto result = session.wait_for(kWaitTimeout);
  if (result == std::future_status::timeout) {
    std::cout << "timeout reached, ending session\n";
    session.cancel();
  }

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

Python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions

# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=cloud_trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
    subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
    subscription_project_id, subscription_id
)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    print(message.data)
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId: string) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async (message: Message) => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async (error: Error) => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise<void>(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async message => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async error => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

Java


import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OpenTelemetrySubscriberExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    openTelemetrySubscriberExample(projectId, subscriptionId);
  }

  public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "subscriber-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Trace analysieren

In den folgenden Abschnitten finden Sie ausführliche Informationen dazu, wie Sie einen Trace in der Google Cloud Console erfassen und analysieren.

Hinweise

  • Beim Veröffentlichen eines Batches von Nachrichten wird der RPC-Span für die Veröffentlichung in einem separaten Trace.
  • Ein Publish-RPC hat mehrere Ursprungsbereiche, da mehrere Create-Aufrufe zu einem Publish-RPC führen können, wenn sie in einem Batch zusammengefasst werden.
  • Spans in OpenTelemetry können null oder einen übergeordneten Span haben.

    Spans, die Batch-Vorgänge wie Veröffentlichungs-Batch, (die logisch mehrere übergeordnete Elemente haben sollten) kann nicht mit keine übergeordneten Spans vorhanden sind.

Während des Nachrichtenlebenszyklus erstellte Spans verfolgen

Die folgende Abbildung zeigt ein Beispiel für Spans, die in einem einzelnen Trace erstellt werden. für eine einzelne Nachricht.

Spans im Tracing ansehen

Jeder Span kann zusätzliche Attribute haben, zusätzliche Informationen, wie z. B. die Größe der Nachrichtenbyte und die Reihenfolge der Schlüsselinformationen.

Spans im Tracing ansehen

Span-Attribute übermitteln zusätzliche Metadaten wie den Reihenfolgeschlüssel der Nachricht, Nachrichten-ID und Größe der Nachricht.

Die Haupt-Veröffentlichungs- und Abo-Spans werden durch Span-Ereignisse erweitert, gibt an, wann ein Netzwerkaufruf erfolgt und wann er abgeschlossen wurde.

Spans in der Aufzeichnung ansehen

Häufige Probleme beheben

Die folgenden Probleme können zu Problemen beim Tracing führen:

  • Das Dienstkonto, das Sie zum Exportieren von Traces verwenden, hat keine die erforderliche Rolle roles/cloudtrace.agent.
  • Das Kontingent für die maximale Anzahl der aufgenommenen Spans in Cloud Trace wurde erreicht.
  • Ihre Anwendung wird beendet, ohne den entsprechenden Leerungsvorgang aufzurufen. .

Nächste Schritte