Registro de OpenTelemetry de Pub/Sub

El seguimiento de OpenTelemetry te permite identificar y hacer un seguimiento de la latencia de varias operaciones de la biblioteca cliente de Pub/Sub, como el procesamiento por lotes, la administración de asignaciones y el control de flujo. La recopilación de esta información puede ayudarte a depurar problemas de la biblioteca cliente.

Estos son algunos casos de uso potenciales para el seguimiento de OpenTelemetry:

  • Tu servicio tiene una latencia de publicación más alta de lo normal.
  • Estás experimentando una gran cantidad de reenvío de mensajes.
  • Un cambio en la función de devolución de llamada del cliente de suscriptor hace que el procesamiento tarde más de lo habitual.

Antes de comenzar

Antes de configurar OpenTelemetry, completa las siguientes tareas:

Roles obligatorios

Para garantizar que la cuenta de servicio tenga los permisos necesarios para exportar registros a Cloud Trace, pídele a tu administrador que otorgue a la cuenta de servicio los siguientes roles de IAM en tu proyecto:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Estos roles predefinidos contienen los permisos necesarios para exportar registros a Cloud Trace. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para exportar seguimientos a Cloud Trace:

  • Todos: cloudtrace.traces.patch

Es posible que tu administrador también pueda otorgar estos permisos a la cuenta de servicio con roles personalizados o con otros roles predefinidos.

Flujo de trabajo de seguimiento de OpenTelemetry

Para configurar el seguimiento de OpenTelemetry, usa las bibliotecas cliente de Pub/Sub y el SDK de OpenTelemetry. Con el SDK, debes configurar un exportador de seguimiento y un proveedor de seguimiento antes de conectarte a las bibliotecas de Pub/Sub. En algunas bibliotecas, configurar un proveedor de generadores de registros es opcional.

  • Exportador de seguimiento. El SDK de OpenTelemetry usa el exportador de seguimientos para determinar dónde enviar los seguimientos.

  • Proveedor de servicios de seguimiento. Las bibliotecas cliente de Pub/Sub usan el proveedor de generadores de trazas para crear trazas.

En los siguientes pasos, se describe cómo configurar el seguimiento:

  1. Crea una instancia de un exportador de OpenTelemetry de Cloud Trace.
  2. Si es necesario, crea una instancia de un proveedor de generador de registros y regístralo con el SDK de OpenTelemetry.
  3. Configura tu cliente con la opción para habilitar el seguimiento de OpenTelemetry.
  4. Usa las bibliotecas cliente de Pub/Sub para publicar un mensaje.

Cómo funciona el seguimiento

Para cada mensaje publicado, la biblioteca cliente crea un registro nuevo. Este registro representa todo el ciclo de vida del mensaje, desde el momento en que lo publicas hasta que se confirma. Un seguimiento encapsula información como la duración de las operaciones, los intervalos superiores y secundarios, y los intervalos vinculados.

Un seguimiento consta de un intervalo raíz y sus intervalos secundarios correspondientes. Estos intervalos representan el trabajo que realiza la biblioteca cliente cuando procesa un mensaje. Cada registro de mensajes contiene lo siguiente:

  • Para la publicación. Control de flujo, programación de claves de orden, procesamiento por lotes y la longitud de la RPC de publicación
  • Para suscripciones. Control de simultaneidad, programación de claves de orden y administración de arrendamientos

Para propagar información del lado de publicación al de suscripción, las bibliotecas cliente insertan un atributo de seguimiento específico en el lado de publicación. El mecanismo de propagación de contexto solo se habilita cuando se activa el seguimiento y se agrega el prefijo googclient_.

Publica mensajes con seguimiento

En la siguiente muestra de código, se muestra cómo habilitar el seguimiento con la biblioteca cliente de Pub/Sub y el SDK de OpenTelemetry. En este ejemplo, los resultados del seguimiento se exportan a Cloud Trace.

Consideraciones

Cuando creas una instancia del proveedor de generador de registros, configuras una proporción de muestreo con el SDK de OpenTelemetry. Esta proporción determina cuántos seguimientos debe muestrear el SDK. Una tasa de muestreo más baja puede ayudar a reducir los costos de facturación y evitar que tu servicio supere la cuota de intervalo de Cloud Trace.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"go.opentelemetry.io/otel"
	"google.golang.org/api/option"

	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing
// enabled, exporting to Cloud Trace.
func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, sampling float64) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("publisher"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampling)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Publishing message with tracing"),
	})
	if _, err := result.Get(ctx); err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintln(w, "Published a traced message")
	return nil
}

C++

// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;

// This example uses a simple wrapper to export (upload) OTel tracing data
// to Google Cloud Trace. More complex applications may use different
// authentication, or configure their own OTel exporter.
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);

auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
    pubsub::Topic(project_id, topic_id),
    // Configure this publisher to enable OTel tracing. Some applications may
    // chose to disable tracing in some publishers or to dynamically enable
    // this option based on their own configuration.
    gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

// After this point, use the Cloud Pub/Sub C++ client library as usual.
// In this example, we will send a few messages and configure a callback
// action for each one.
std::vector<gc::future<void>> ids;
for (int i = 0; i < 5; i++) {
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .then([](gc::future<gc::StatusOr<std::string>> f) {
                  auto id = f.get();
                  if (!id) {
                    std::cout << "Error in publish: " << id.status() << "\n";
                    return;
                  }
                  std::cout << "Sent message with id: (" << *id << ")\n";
                });
  ids.push_back(std::move(id));
}
// Block until the messages are actually sent.
for (auto& id : ids) id.get();

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.


from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions

# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)

# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
    publisher_options=PublisherOptions(
        enable_open_telemetry_tracing=True,
    ),
)

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publisher = pubSubClient.topic(topicNameOrId);

  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publisher = pubSubClient.topic(topicNameOrId);

  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

Java


import com.google.api.core.ApiFuture;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryPublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    openTelemetryPublisherExample(projectId, topicId);
  }

  public static void openTelemetryPublisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "publisher-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with the created OpenTelemetry object and enabling tracing.
      publisher =
          Publisher.newBuilder(topicName)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Cómo recibir mensajes con seguimiento

Go

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
	"google.golang.org/api/option"
)

func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string, sampleRate float64) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// sampleRate := "1.0"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("subscriber"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

C++

#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 4) {
    std::cerr << "Usage: " << argv[0]
              << " <project-id> <topic-id> <subscription-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const topic_id = argv[2];
  std::string const subscription_id = argv[3];

  // Create a few namespace aliases to make the code easier to read.
  namespace gc = ::google::cloud;
  namespace otel = gc::otel;
  namespace pubsub = gc::pubsub;

  auto constexpr kWaitTimeout = std::chrono::seconds(30);

  auto project = gc::Project(project_id);
  auto configuration = otel::ConfigureBasicTracing(project);

  // Publish a message with tracing enabled.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      pubsub::Topic(project_id, topic_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
  // Block until the message is actually sent and throw on error.
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .get()
                .value();
  std::cout << "Sent message with id: (" << id << ")\n";

  // Receive a message using streaming pull with tracing enabled.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(project_id, subscription_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

  auto session =
      subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
      });

  std::cout << "Waiting for messages on " + subscription_id + "...\n";

  // Blocks until the timeout is reached.
  auto result = session.wait_for(kWaitTimeout);
  if (result == std::future_status::timeout) {
    std::cout << "timeout reached, ending session\n";
    session.cancel();
  }

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

Python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions

# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=cloud_trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
    subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
    subscription_project_id, subscription_id
)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    print(message.data)
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId: string) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async (message: Message) => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async (error: Error) => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise<void>(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async message => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async error => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

Java


import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OpenTelemetrySubscriberExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    openTelemetrySubscriberExample(projectId, subscriptionId);
  }

  public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "subscriber-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Cómo analizar un registro

En las siguientes secciones, se incluye información detallada sobre cómo hacer un seguimiento de un seguimiento y analizarlo en la consola de Google Cloud.

Consideraciones

  • Cuando se publica un lote de mensajes, el intervalo de RPC de publicación se captura en un registro independiente.
  • Una RPC de publicación tiene varios intervalos de origen, ya que varias llamadas de creación pueden generar una RPC de publicación cuando se agrupan.
  • Los intervalos de OpenTelemetry pueden tener cero o un intervalo superior.

    Los intervalos que representan operaciones por lotes, como un lote de publicación (que, lógicamente, debería tener varios elementos superiores), no se pueden representar con cero o un intervalo superior.

Realiza un seguimiento de los intervalos creados durante el ciclo de vida del mensaje

En la siguiente imagen, se muestra un ejemplo de intervalos que se crean en un solo seguimiento para un solo mensaje.

Cómo ver intervalos en el seguimiento

Cada intervalo puede tener atributos adicionales que proporcionan información adicional, como el tamaño de bytes del mensaje y la información de la clave de orden.

Cómo ver intervalos en el seguimiento

Los atributos de intervalo transmiten metadatos adicionales, como la clave de orden del mensaje, el ID del mensaje y el tamaño del mensaje.

Los intervalos de publicación y suscripción principales se aumentan con eventos de intervalo que corresponden al momento en que se emite una llamada de red y cuando se completa.

Cómo ver intervalos en el seguimiento

Soluciona los problemas comunes.

Los siguientes problemas pueden causar problemas con el seguimiento:

  • La cuenta de servicio que usas para exportar registros no tiene el rol roles/cloudtrace.agent requerido.
  • Se alcanzó la cuota de la cantidad máxima de intervalos transferidos en Cloud Trace.
  • Tu aplicación se cierra sin llamar a la función de limpieza adecuada.

¿Qué sigue?