使用 OpenCensus 捕获自定义客户端指标

本文档介绍了如何使用 OpenCensus 捕获自定义客户端指标。自定义客户端指标有助于查找系统中延迟时间的来源。如需了解详情,请参阅确定延迟时间点

Spanner 客户端库还使用 OpenCensus 可观测性框架提供统计信息和跟踪记录。默认情况下,该框架处于停用状态。

您需要熟悉与 OpenCensus 相关的自定义指标,并将 OpenCensus 指标库和 Google Cloud Observability 导出器提供给您的应用,然后才能捕获自定义指标。

捕获客户端往返延迟时间

客户端往返延迟时间是指客户端发送到数据库的 Spanner API 请求的第一个字节到客户端从数据库收到的响应的最后一个字节之间的时长(以毫秒为单位)。API 请求可以通过 Google Front End (GFE) 或 Cloud Spanner API 前端发送。

您可以使用以下代码捕获客户端往返延迟时间:

Java

static void captureGrpcMetric(DatabaseClient dbClient) {
  // Add io.grpc:grpc-census and io.opencensus:opencensus-exporter-stats-stackdriver
  //  dependencies to enable gRPC metrics.

  // Register basic gRPC views.
  RpcViews.registerClientGrpcBasicViews();

  // Enable OpenCensus exporters to export metrics to Stackdriver Monitoring.
  // Exporters use Application Default Credentials to authenticate.
  // See https://developers.google.com/identity/protocols/application-default-credentials
  // for more details.
  try {
    StackdriverStatsExporter.createAndRegister();
  } catch (IOException | IllegalStateException e) {
    System.out.println("Error during StackdriverStatsExporter");
  }

  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Go


import (
	"context"
	"fmt"
	"io"
	"regexp"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"

	"contrib.go.opencensus.io/exporter/stackdriver"
	"go.opencensus.io/plugin/ocgrpc"
	"go.opencensus.io/stats/view"
)

var validDatabasePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)/databases/(?P<database>[^/]+)$")

func queryWithGRPCMetric(w io.Writer, db string) error {
	projectID, _, _, err := parseDatabaseName(db)
	if err != nil {
		return err
	}

	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	// Register OpenCensus views.
	if err := view.Register(ocgrpc.DefaultClientViews...); err != nil {
		return err
	}

	// Create OpenCensus Stackdriver exporter.
	sd, err := stackdriver.NewExporter(stackdriver.Options{
		ProjectID: projectID,
	})
	if err != nil {
		return err
	}
	// It is imperative to invoke flush before your main function exits
	defer sd.Flush()

	// Start the metrics exporter
	sd.StartMetricsExporter()
	defer sd.StopMetricsExporter()

	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

func parseDatabaseName(databaseUri string) (project, instance, database string, err error) {
	matches := validDatabasePattern.FindStringSubmatch(databaseUri)
	if len(matches) == 0 {
		return "", "", "", fmt.Errorf("failed to parse database name from %q according to pattern %q",
			databaseUri, validDatabasePattern.String())
	}
	return matches[1], matches[2], matches[3], nil
}

该代码示例会在将指标导出到 Cloud Monitoring 时,将字符串 roundtrip_latency 附加到指标名称。您可以在 Cloud Monitoring 中使用附加的字符串搜索此指标。

捕获 GFE 延迟时间

GFE 延迟时间是指 Google 网络收到来自客户端的远程过程调用与 GFE 收到响应的第一个字节之间的时长(以毫秒为单位)。

您可以使用以下代码捕获 GFE 延迟时间:

Java

static void captureGfeMetric(DatabaseClient dbClient) {
  // Capture GFE Latency.
  SpannerRpcViews.registerGfeLatencyView();

  // Capture GFE Latency and GFE Header missing count.
  // SpannerRpcViews.registerGfeLatencyAndHeaderMissingCountViews();

  // Capture only GFE Header missing count.
  // SpannerRpcViews.registerGfeHeaderMissingCountView();

  // Enable OpenCensus exporters to export metrics to Stackdriver Monitoring.
  // Exporters use Application Default Credentials to authenticate.
  // See https://developers.google.com/identity/protocols/application-default-credentials
  // for more details.
  try {
    StackdriverStatsExporter.createAndRegister();
  } catch (IOException | IllegalStateException e) {
    System.out.println("Error during StackdriverStatsExporter");
  }

  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Go


// We are in the process of adding support in the Cloud Spanner Go Client Library
// to capture the gfe_latency metric.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"strings"

	spanner "cloud.google.com/go/spanner/apiv1"
	sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
	gax "github.com/googleapis/gax-go/v2"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"

	"contrib.go.opencensus.io/exporter/stackdriver"
	"go.opencensus.io/stats"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/tag"
)

// OpenCensus Tag, Measure and View.
var (
	KeyMethod    = tag.MustNewKey("grpc_client_method")
	GFELatencyMs = stats.Int64("cloud.google.com/go/spanner/gfe_latency",
		"Latency between Google's network receives an RPC and reads back the first byte of the response", "ms")
	GFELatencyView = view.View{
		Name:        "cloud.google.com/go/spanner/gfe_latency",
		Measure:     GFELatencyMs,
		Description: "Latency between Google's network receives an RPC and reads back the first byte of the response",
		Aggregation: view.Distribution(0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0,
			16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0,
			300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
			100000.0),
		TagKeys: []tag.Key{KeyMethod}}
)

func queryWithGFELatency(w io.Writer, db string) error {
	projectID, _, _, err := parseDatabaseName(db)
	if err != nil {
		return err
	}

	ctx := context.Background()
	client, err := spanner.NewClient(ctx)
	if err != nil {
		return err
	}
	defer client.Close()

	// Register OpenCensus views.
	err = view.Register(&GFELatencyView)
	if err != nil {
		return err
	}

	// Create OpenCensus Stackdriver exporter.
	sd, err := stackdriver.NewExporter(stackdriver.Options{
		ProjectID: projectID,
	})
	if err != nil {
		return err
	}
	// It is imperative to invoke flush before your main function exits
	defer sd.Flush()

	// Start the metrics exporter
	sd.StartMetricsExporter()
	defer sd.StopMetricsExporter()

	// Create a session.
	req := &sppb.CreateSessionRequest{Database: db}
	session, err := client.CreateSession(ctx, req)
	if err != nil {
		return err
	}

	// Execute a SQL query and retrieve the GFE server-timing header in gRPC metadata.
	req2 := &sppb.ExecuteSqlRequest{
		Session: session.Name,
		Sql:     `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`,
	}
	var md metadata.MD
	resultSet, err := client.ExecuteSql(ctx, req2, gax.WithGRPCOptions(grpc.Header(&md)))
	if err != nil {
		return err
	}
	for _, row := range resultSet.GetRows() {
		for _, value := range row.GetValues() {
			fmt.Fprintf(w, "%s ", value.GetStringValue())
		}
		fmt.Fprintf(w, "\n")
	}

	// The format is: "server-timing: gfet4t7; dur=[GFE latency in ms]"
	srvTiming := md.Get("server-timing")[0]
	gfeLtcy, err := strconv.Atoi(strings.TrimPrefix(srvTiming, "gfet4t7; dur="))
	if err != nil {
		return err
	}
	// Record GFE t4t7 latency with OpenCensus.
	ctx, err = tag.New(ctx, tag.Insert(KeyMethod, "ExecuteSql"))
	if err != nil {
		return err
	}
	stats.Record(ctx, GFELatencyMs.M(int64(gfeLtcy)))

	return nil
}

该代码示例会在将指标导出到 Cloud Monitoring 时,将字符串 spanner/gfe_latency 附加到指标名称。您可以在 Cloud Monitoring 中使用附加的字符串搜索此指标。

捕获 Cloud Spanner API 请求延迟时间

Cloud Spanner API 请求延迟时间是 Cloud Spanner API 前端收到客户端请求的第一个字节与 Cloud Spanner API 前端发送响应的最后一个字节之间的时间(以秒为单位)。

此延迟时间指标在 Cloud Monitoring 的 Spanner 指标中提供。

捕获查询延迟时间

查询延迟时间是在 Spanner 数据库中运行 SQL 查询所需的时长(以毫秒为单位)。

您可以使用以下代码捕获查询延迟时间:

Java

private static final String MILLISECOND = "ms";
static final List<Double> RPC_MILLIS_BUCKET_BOUNDARIES =
    Collections.unmodifiableList(
        Arrays.asList(
            0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0,
            16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0,
            300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
            100000.0));
static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
    Distribution.create(BucketBoundaries.create(RPC_MILLIS_BUCKET_BOUNDARIES));

static MeasureDouble QUERY_STATS_ELAPSED =
    MeasureDouble.create(
        "cloud.google.com/java/spanner/query_stats_elapsed",
        "The execution of the query",
        MILLISECOND);

// Register the view. It is imperative that this step exists,
// otherwise recorded metrics will be dropped and never exported.
static View QUERY_STATS_LATENCY_VIEW = View
    .create(Name.create("cloud.google.com/java/spanner/query_stats_elapsed"),
        "The execution of the query",
        QUERY_STATS_ELAPSED,
        AGGREGATION_WITH_MILLIS_HISTOGRAM,
        Collections.emptyList());

static ViewManager manager = Stats.getViewManager();
private static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder();

static void captureQueryStatsMetric(DatabaseClient dbClient) {
  manager.registerView(QUERY_STATS_LATENCY_VIEW);

  // Enable OpenCensus exporters to export metrics to Cloud Monitoring.
  // Exporters use Application Default Credentials to authenticate.
  // See https://developers.google.com/identity/protocols/application-default-credentials
  // for more details.
  try {
    StackdriverStatsExporter.createAndRegister();
  } catch (IOException | IllegalStateException e) {
    System.out.println("Error during StackdriverStatsExporter");
  }

  try (ResultSet resultSet = dbClient.singleUse()
      .analyzeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"),
          QueryAnalyzeMode.PROFILE)) {

    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
    Value value = resultSet.getStats().getQueryStats()
        .getFieldsOrDefault("elapsed_time", Value.newBuilder().setStringValue("0 msecs").build());
    double elapasedTime = Double.parseDouble(value.getStringValue().replaceAll(" msecs", ""));
    STATS_RECORDER.newMeasureMap()
        .put(QUERY_STATS_ELAPSED, elapasedTime)
        .record();
  }
}

Go


import (
	"context"
	"fmt"
	"io"
	"strconv"
	"strings"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"

	"contrib.go.opencensus.io/exporter/stackdriver"
	"go.opencensus.io/stats"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/tag"
)

// OpenCensus Tag, Measure and View.
var (
	QueryStatsElapsed = stats.Float64("cloud.google.com/go/spanner/query_stats_elapsed",
		"The execution of the query", "ms")
	QueryStatsLatencyView = view.View{
		Name:        "cloud.google.com/go/spanner/query_stats_elapsed",
		Measure:     QueryStatsElapsed,
		Description: "The execution of the query",
		Aggregation: view.Distribution(0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0,
			16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0,
			300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0,
			100000.0),
		TagKeys: []tag.Key{}}
)

func queryWithQueryStats(w io.Writer, db string) error {
	projectID, _, _, err := parseDatabaseName(db)
	if err != nil {
		return err
	}

	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	// Register OpenCensus views.
	err = view.Register(&QueryStatsLatencyView)
	if err != nil {
		return err
	}

	// Create OpenCensus Stackdriver exporter.
	sd, err := stackdriver.NewExporter(stackdriver.Options{
		ProjectID: projectID,
	})
	if err != nil {
		return err
	}
	// It is imperative to invoke flush before your main function exits
	defer sd.Flush()

	// Start the metrics exporter
	sd.StartMetricsExporter()
	defer sd.StopMetricsExporter()

	// Execute a SQL query and get the query stats.
	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().QueryWithStats(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			// Record query execution time with OpenCensus.
			elapasedTime := iter.QueryStats["elapsed_time"].(string)
			elapasedTimeMs, err := strconv.ParseFloat(strings.TrimSuffix(elapasedTime, " msecs"), 64)
			if err != nil {
				return err
			}
			stats.Record(ctx, QueryStatsElapsed.M(elapasedTimeMs))
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

该代码示例会在将指标导出到 Cloud Monitoring 时,将字符串 spanner/query_stats_elapsed 附加到指标名称。您可以在 Cloud Monitoring 中使用附加的字符串搜索此指标。

在 Metrics Explorer 中查看指标

  1. 在 Google Cloud 控制台中,前往 Metrics Explorer 页面。

    进入 Metrics Explorer

  2. 选择您的项目。

  3. 点击选择指标

  4. 使用以下字符串搜索延迟时间指标:

    • roundtrip_latency:适用于客户端往返延迟时间指标。
    • spanner/gfe_latency:适用于 GFE 延迟时间指标。
    • spanner/query_stats_elapsed:适用于查询延迟时间指标。
  5. 选择相应指标,然后点击应用

如需详细了解如何对指标进行分组或汇总,请参阅使用菜单构建查询

后续步骤