Pub/Sub OpenTelemetry 추적

OpenTelemetry 추적을 사용하면 일괄 처리, 임대 관리, 흐름 제어와 같은 다양한 Pub/Sub 클라이언트 라이브러리 작업의 지연 시간을 식별하고 추적할 수 있습니다. 이 정보를 수집하면 클라이언트 라이브러리 문제를 디버깅하는 데 도움이 됩니다.

OpenTelemetry 추적의 잠재적 사용 사례는 다음과 같습니다.

  • 서비스의 게시 지연 시간이 평소보다 길어지고 있습니다.
  • 메시지 재전송 횟수가 많습니다.
  • 구독자 클라이언트의 콜백 함수를 변경하면 처리 시간이 평소보다 오래 걸립니다.

시작하기 전에

OpenTelemetry를 구성하기 전에 다음 태스크를 완료합니다.

필요한 역할

서비스 계정에 Cloud Trace로 trace를 내보내는 데 필요한 권한이 있는지 확인하려면 관리자에게 서비스 계정에 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이러한 사전 정의된 역할에는 Cloud Trace로 trace를 내보내는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

Cloud Trace로 trace를 내보내려면 다음 권한이 필요합니다.

  • 전체: cloudtrace.traces.patch

관리자는 커스텀 역할이나 다른 사전 정의된 역할을 사용하여 서비스 계정에 이러한 권한을 부여할 수도 있습니다.

OpenTelemetry 추적 워크플로

OpenTelemetry 추적을 설정하려면 Pub/Sub 클라이언트 라이브러리와 OpenTelemetry SDK를 사용합니다. SDK를 사용하려면 Pub/Sub 라이브러리에 연결하기 전에 trace 내보내기 도구와 추적기 공급자를 설정해야 합니다. 일부 라이브러리에서는 트레이서 공급자를 설정하는 것이 선택사항입니다.

  • Trace 내보내기 도구. OpenTelemetry SDK는 trace 내보내기 도구를 사용하여 trace를 전송할 위치를 결정합니다.

  • 추적기 공급자. Pub/Sub 클라이언트 라이브러리는 추적기 공급자를 사용하여 trace를 만듭니다.

다음 단계에서는 추적을 설정하는 방법을 간략히 설명합니다.

  1. Cloud Trace OpenTelemetry 내보내기 도구를 인스턴스화합니다.
  2. 필요한 경우 OpenTelemetery SDK를 사용하여 추적기 공급자를 인스턴스화하고 등록합니다.
  3. OpenTelemetry 추적 사용 설정 옵션으로 클라이언트를 구성합니다.
  4. Pub/Sub 클라이언트 라이브러리를 사용하여 메시지를 게시합니다.

추적 작동 방식

게시되는 모든 메시지에 대해 클라이언트 라이브러리는 새 trace를 만듭니다. 이 trace는 메시지를 게시하는 순간부터 메시지가 확인될 때까지의 메시지 전체 수명 주기를 나타냅니다. trace는 작업 시간, 상위 스팬 및 하위 스팬, 연결된 스팬과 같은 정보를 캡슐화합니다.

trace는 루트 스팬과 이에 상응하는 하위 스팬으로 구성됩니다. 이러한 스팬은 클라이언트 라이브러리가 메시지를 처리할 때 실행하는 작업을 나타냅니다. 각 메시지 trace에는 다음이 포함됩니다.

  • 게시의 경우 흐름 제어, 순서 키 예약, 일괄 처리, 게시 RPC 길이.
  • 구독의 경우 동시 실행 제어, 순서 키 예약, 임대 관리.

게시 측에서 구독 측으로 정보를 전파하기 위해 클라이언트 라이브러리는 게시 측에 추적 관련 속성을 삽입합니다. 컨텍스트 전파 메커니즘은 추적이 사용 설정된 경우에만 사용 설정되며 googclient_ 프리픽스가 추가됩니다.

추적으로 메시지 게시

다음 코드 샘플은 Pub/Sub 클라이언트 라이브러리와 OpenTelemetry SDK를 사용하여 추적을 사용 설정하는 방법을 보여줍니다. 이 샘플에서는 추적 결과가 Cloud Trace로 내보내집니다.

고려사항

추적기 공급자를 인스턴스화할 때 OpenTelemetry SDK로 샘플링 레이트를 구성합니다. 이 비율에 따라 SDK에서 샘플링해야 하는 trace 수가 결정됩니다. 샘플링 레이트를 낮추면 결제 비용을 줄이고 서비스가 Cloud Trace 스팬 할당량을 초과하지 않도록 할 수 있습니다.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"go.opentelemetry.io/otel"
	"google.golang.org/api/option"

	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

// publishOpenTelemetryTracing publishes a single message with OpenTelemetry tracing
// enabled, exporting to Cloud Trace.
func publishOpenTelemetryTracing(w io.Writer, projectID, topicID string, sampling float64) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("publisher"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampling)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Publishing message with tracing"),
	})
	if _, err := result.Get(ctx); err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintln(w, "Published a traced message")
	return nil
}

C++

// Create a few namespace aliases to make the code easier to read.
namespace gc = ::google::cloud;
namespace otel = gc::otel;
namespace pubsub = gc::pubsub;

// This example uses a simple wrapper to export (upload) OTel tracing data
// to Google Cloud Trace. More complex applications may use different
// authentication, or configure their own OTel exporter.
auto project = gc::Project(project_id);
auto configuration = otel::ConfigureBasicTracing(project);

auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
    pubsub::Topic(project_id, topic_id),
    // Configure this publisher to enable OTel tracing. Some applications may
    // chose to disable tracing in some publishers or to dynamically enable
    // this option based on their own configuration.
    gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

// After this point, use the Cloud Pub/Sub C++ client library as usual.
// In this example, we will send a few messages and configure a callback
// action for each one.
std::vector<gc::future<void>> ids;
for (int i = 0; i < 5; i++) {
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .then([](gc::future<gc::StatusOr<std::string>> f) {
                  auto id = f.get();
                  if (!id) {
                    std::cout << "Error in publish: " << id.status() << "\n";
                    return;
                  }
                  std::cout << "Sent message with id: (" << *id << ")\n";
                });
  ids.push_back(std::move(id));
}
// Block until the messages are actually sent.
for (auto& id : ids) id.get();

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.


from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions

# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)

# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
    publisher_options=PublisherOptions(
        enable_open_telemetry_tracing=True,
    ),
)

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)
# Publish messages.
for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publisher = pubSubClient.topic(topicNameOrId);

  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_OR_ID';
// const data = 'Hello, world!";

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel publisher example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!"
  // or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publisher = pubSubClient.topic(topicNameOrId);

  const messageId = await publisher.publishMessage({data: dataBuffer});
  console.log(`Message ${messageId} published.`);

  // The rest of the sample is in service to making sure that any
  // buffered Pub/Sub messages and/or OpenTelemetry spans are properly
  // flushed to the server side. In normal usage, you'd only need to do
  // something like this on process shutdown.
  await publisher.flush();
  await processor.forceFlush();
  await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
}

자바


import com.google.api.core.ApiFuture;
import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryPublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    openTelemetryPublisherExample(projectId, topicId);
  }

  public static void openTelemetryPublisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "publisher-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with the created OpenTelemetry object and enabling tracing.
      publisher =
          Publisher.newBuilder(topicName)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

추적으로 메시지 수신

Go

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
	"google.golang.org/api/option"
)

func subscribeOpenTelemetryTracing(w io.Writer, projectID, subID string, sampleRate float64) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// sampleRate := "1.0"
	ctx := context.Background()

	exporter, err := texporter.New(texporter.WithProjectID(projectID),
		// Disable spans created by the exporter.
		texporter.WithTraceClientOptions(
			[]option.ClientOption{option.WithTelemetryDisabled()},
		),
	)
	if err != nil {
		return fmt.Errorf("error instantiating exporter: %w", err)
	}

	resources := resource.NewWithAttributes(
		semconv.SchemaURL,
		semconv.ServiceNameKey.String("subscriber"),
	)

	// Instantiate a tracer provider with the following settings
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(resources),
		sdktrace.WithSampler(
			sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRate)),
		),
	)

	defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)

	// Create a new client with tracing enabled.
	client, err := pubsub.NewClientWithConfig(ctx, projectID, &pubsub.ClientConfig{
		EnableOpenTelemetryTracing: true,
	})
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

C++

#include "google/cloud/opentelemetry/configure_basic_tracing.h"
#include "google/cloud/opentelemetry_options.h"
#include "google/cloud/pubsub/message.h"
#include "google/cloud/pubsub/publisher.h"
#include "google/cloud/pubsub/subscriber.h"
#include "google/cloud/pubsub/subscription.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 4) {
    std::cerr << "Usage: " << argv[0]
              << " <project-id> <topic-id> <subscription-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const topic_id = argv[2];
  std::string const subscription_id = argv[3];

  // Create a few namespace aliases to make the code easier to read.
  namespace gc = ::google::cloud;
  namespace otel = gc::otel;
  namespace pubsub = gc::pubsub;

  auto constexpr kWaitTimeout = std::chrono::seconds(30);

  auto project = gc::Project(project_id);
  auto configuration = otel::ConfigureBasicTracing(project);

  // Publish a message with tracing enabled.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      pubsub::Topic(project_id, topic_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));
  // Block until the message is actually sent and throw on error.
  auto id = publisher.Publish(pubsub::MessageBuilder().SetData("Hi!").Build())
                .get()
                .value();
  std::cout << "Sent message with id: (" << id << ")\n";

  // Receive a message using streaming pull with tracing enabled.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(project_id, subscription_id),
      gc::Options{}.set<gc::OpenTelemetryTracingOption>(true)));

  auto session =
      subscriber.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
      });

  std::cout << "Waiting for messages on " + subscription_id + "...\n";

  // Blocks until the timeout is reached.
  auto result = session.wait_for(kWaitTimeout);
  if (result == std::future_status::timeout) {
    std::cout << "timeout reached, ending session\n";
    session.cancel();
  }

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

Python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions

# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
    project_id=cloud_trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
    subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
    subscription_project_id, subscription_id
)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    print(message.data)
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

TypeScript

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';

// Imports the OpenTelemetry API
import {NodeTracerProvider} from '@opentelemetry/sdk-trace-node';
import {diag, DiagConsoleLogger, DiagLogLevel} from '@opentelemetry/api';
import {SimpleSpanProcessor} from '@opentelemetry/sdk-trace-base';

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
import {TraceExporter} from '@google-cloud/opentelemetry-cloud-trace-exporter';

import {Resource} from '@opentelemetry/resources';
import {SEMRESATTRS_SERVICE_NAME} from '@opentelemetry/semantic-conventions';

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId: string) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async (message: Message) => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async (error: Error) => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise<void>(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Imports the OpenTelemetry API
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
const {diag, DiagConsoleLogger, DiagLogLevel} = require('@opentelemetry/api');
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');

// To output to the console for testing, use the ConsoleSpanExporter.
// import {ConsoleSpanExporter} from '@opentelemetry/sdk-trace-base';

// To output to Cloud Trace, import the OpenTelemetry bridge library.
const {
  TraceExporter,
} = require('@google-cloud/opentelemetry-cloud-trace-exporter');

const {Resource} = require('@opentelemetry/resources');
const {
  SEMRESATTRS_SERVICE_NAME,
} = require('@opentelemetry/semantic-conventions');

// Enable the diagnostic logger for OpenTelemetry
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);

// Log spans out to the console, for testing.
// const exporter = new ConsoleSpanExporter();

// Log spans out to Cloud Trace, for production.
const exporter = new TraceExporter();

// Build a tracer provider and a span processor to do
// something with the spans we're generating.
const provider = new NodeTracerProvider({
  resource: new Resource({
    [SEMRESATTRS_SERVICE_NAME]: 'otel subscriber example',
  }),
});
const processor = new SimpleSpanProcessor(exporter);
provider.addSpanProcessor(processor);
provider.register();

// Creates a client; cache this for further use.
const pubSubClient = new PubSub({enableOpenTelemetryTracing: true});

async function subscriptionListen(subscriptionNameOrId) {
  const subscriber = pubSubClient.subscription(subscriptionNameOrId);

  // Message handler for subscriber
  const messageHandler = async message => {
    console.log(`Message ${message.id} received.`);
    message.ack();
  };

  // Error handler for subscriber
  const errorHandler = async error => {
    console.log('Received error:', error);
  };

  // Listens for new messages from the topic
  subscriber.on('message', messageHandler);
  subscriber.on('error', errorHandler);

  // Ensures that all spans got flushed by the exporter. This function
  // is in service to making sure that any buffered Pub/Sub messages
  // and/or OpenTelemetry spans are properly flushed to the server
  // side. In normal usage, you'd only need to do something like this
  // on process shutdown.
  async function shutdown() {
    await subscriber.close();
    await processor.forceFlush();
    await new Promise(r => setTimeout(r, OTEL_TIMEOUT * 1000));
  }

  // Wait a bit for the subscription to receive messages, then shut down
  // gracefully. This is for the sample only; normally you would not need
  // this delay.
  await new Promise(r =>
    setTimeout(async () => {
      subscriber.removeAllListeners();
      await shutdown();
      r();
    }, SUBSCRIBER_TIMEOUT * 1000)
  );
}

자바


import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ResourceAttributes;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OpenTelemetrySubscriberExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    openTelemetrySubscriberExample(projectId, subscriptionId);
  }

  public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) {
    Resource resource =
        Resource.getDefault()
            .toBuilder()
            .put(ResourceAttributes.SERVICE_NAME, "subscriber-example")
            .build();

    // Creates a Cloud Trace exporter.
    SpanExporter traceExporter =
        TraceExporter.createWithConfiguration(
            TraceConfiguration.builder().setProjectId(projectId).build());

    SdkTracerProvider sdkTracerProvider =
        SdkTracerProvider.builder()
            .setResource(resource)
            .addSpanProcessor(SimpleSpanProcessor.create(traceExporter))
            .setSampler(Sampler.alwaysOn())
            .build();

    OpenTelemetry openTelemetry =
        OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setOpenTelemetry(openTelemetry)
              .setEnableOpenTelemetryTracing(true)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

trace 분석

다음 섹션에서는 Google Cloud 콘솔에서 trace를 추적하고 분석하는 방법을 자세히 설명합니다.

고려사항

  • 메시지 일괄 게시 시 게시 RPC 스팬이 별도의 trace에 캡처됩니다.
  • 게시 RPC에는 여러 출처 스팬이 있습니다. 여러 생성 호출을 일괄 처리하면 게시 RPC가 발생할 수 있기 때문입니다.
  • OpenTelemetry의 스팬에는 0개 또는 1개의 상위 스팬이 있을 수 있습니다.

    일괄 작업을 나타내는 스팬(예: 게시 일괄 처리, 논리적으로 여러 상위 요소가 있어야 함)은 0개 또는 1개의 상위 스팬을 사용하여 나타낼 수 없습니다.

메시지 수명 주기 중에 생성된 스팬 추적

다음 이미지는 단일 메시지에 대해 단일 trace에서 생성된 스팬의 예시를 보여줍니다.

추적에서 스팬 보기

각 스팬에는 메시지 바이트 크기, 순서 키 정보와 같은 추가 정보를 제공하는 추가 속성이 있을 수 있습니다.

추적에서 스팬 보기

스팬 속성은 메시지의 순서 키, 메시지 ID, 메시지 크기와 같은 추가 메타데이터를 전달합니다.

기본 게시 및 구독 스팬은 네트워크 호출이 실행되고 완료될 때에 해당하는 스팬 이벤트로 보강됩니다.

추적에서 스팬 보기

일반적인 문제 해결

다음과 같은 문제로 인해 추적 문제가 발생할 수 있습니다.

  • trace 내보내기에 사용하는 서비스 계정에 필요한 roles/cloudtrace.agent 역할이 없습니다.
  • Cloud Trace에서 수집된 최대 스팬 수의 할당량에 도달했습니다.
  • 적절한 플러시 함수를 호출하지 않고 애플리케이션이 종료됩니다.

다음 단계