在事务之外读取

本页面介绍了如何在只读和读写事务上下文之外在 Spanner 中执行读取操作。如果您符合以下任一条件,则应阅读事务页面:

  • 如果需要写入,则根据一个或多个读取的值,应该将读取作为读写事务的一部分执行。如需了解详情,请参阅读写事务

  • 如果您正在进行多次读取调用,并且它们需要数据的一致视图,则应该将读取作为只读事务的一部分执行。如需了解详情,请参阅只读事务

读取类型

Spanner 提供两种类型的读取,让您能在读取数据时确定数据的最新状态:

  • “强读”是读取当前时间戳处的数据,并保证能够查看读取开始之前已提交的所有数据。Spanner 默认使用强读来处理读取请求。
  • “过时读取”是读取过去时间戳处的数据。如果您的应用对延迟比较敏感,但能容忍过时数据,则过时读取可以带来性能优势。

如需选择所需的读取类型,请在读取请求上设置时间戳边界。选择时间戳边界时,请遵循以下最佳实践:

  • 尽可能选择强读。这些是 Spanner 读取(包括只读事务)的默认时间戳边界。强读保证能够观察到在操作开始之前提交的所有事务的影响,这与哪个副本接收读取无关。正因为如此,强读使应用代码更简单,应用更可靠。如需详细了解 Spanner 的一致性属性,请参阅 TrueTime 和外部一致性

  • 如果在某些情况下,延迟导致无法执行强读,则可以使用过时读取(有界限过时或精确过时)在不需要读取尽可能新的地方提高性能。如复制页面所述,15 秒是一个合理的过时值,可用于实现良好的性能。

使用数据库角色读取数据

如果您是精细访问权限控制用户,则必须选择数据库角色才能执行 SQL 语句和查询,以及对数据库执行行操作。您的角色选择将在整个会话过程中持续,直到您更改角色。

如需了解如何使用数据库角色执行读取操作,请参阅使用精细访问权限控制访问数据库

单次读取方法

Spanner 支持对数据库使用单次读取方法(即在事务上下文之外进行读取),以便:

  • 以 SQL 查询语句或使用 Spanner 的读取 API 执行读取。
  • 对表中的单行或多行执行强读。
  • 从表中的单行或多行执行过时读取。
  • 从二级索引中的单行或多行读取。

如果要将单个读取路由到多区域实例配置或具有可选只读区域的自定义区域配置中的特定副本或区域,请参阅定向读取

以下部分介绍了如何通过 Spanner 客户端库使用读取方法。

执行查询

下面演示了如何对数据库执行 SQL 查询语句。

GoogleSQL

C++

使用 ExecuteQuery() 对数据库执行 SQL 查询语句。

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

使用 ExecuteReaderAsync() 来查询数据库。


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

使用 Client.Single().Query 来查询数据库。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func query(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

使用 ReadContext.executeQuery 来查询数据库。

static void query(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Node.js

使用 Database.run 来查询数据库。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const query = {
  sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};

// Queries rows from the Albums table
try {
  const [rows] = await database.run(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

PHP

使用 Database::execute 来查询数据库。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $results = $database->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );

    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

使用 Database.execute_sql 来查询数据库。

def query_data(instance_id, database_id):
    """Queries sample data from the database using SQL."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

使用 Client#execute 来查询数据库。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

构造 SQL 语句时,请参阅 SQL 查询语法以及函数和运算符参考资料。

执行强读

下面演示了如何对数据库中的零行或多行执行强读。

GoogleSQL

C++

读取数据的代码与上一个通过执行 SQL 查询来查询 Spanner 的示例相同。

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

读取数据的代码与上一个通过执行 SQL 查询来查询 Spanner 的示例相同。


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

使用 Client.Single().Read 从数据库中读取行。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func read(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
		[]string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

该示例使用 AllKeys 来定义要读取的键或键范围的集合。

Java

使用 ReadContext.read 从数据库中读取行。

static void read(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .read(
              "Albums",
              KeySet.all(), // Read all rows in a table.
              Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

该示例使用 KeySet 来定义要读取的键或键范围的集合。

Node.js

使用 Table.read 从数据库中读取行。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
};

try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

该示例使用 keySet 来定义要读取的键或键范围的集合。

PHP

使用 Database::read 从数据库中读取行。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.
 * Example:
 * ```
 * read_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

该示例使用 keySet 来定义要读取的键或键范围的集合。

Python

使用 Database.read 从数据库中读取行。

def read_data(instance_id, database_id):
    """Reads sample data from the database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

该示例使用 KeySet 来定义要读取的键或键范围的集合。

Ruby

使用 Client#read 从数据库中读取行。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

执行过时读取

以下示例代码演示了如何使用精确过时时间戳边界对数据库中的零行或多行执行过时读取。如需了解如何使用有界限过时时间戳边界执行过时读取,请参阅示例代码后面的注释。如需详细了解不同类型的可用时间戳边界,请参阅时间戳边界

GoogleSQL

C++

ExecuteQuery()MakeReadOnlyTransaction()Transaction::ReadOnlyOptions() 配合使用以执行过时读取。

void ReadStaleData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  // The timestamp chosen using the `exact_staleness` parameter is bounded
  // below by the creation time of the database, so the visible state may only
  // include that generated by the `extra_statements` executed atomically with
  // the creation of the database. Here we at least know `Albums` exists.
  auto opts = spanner::Transaction::ReadOnlyOptions(std::chrono::seconds(15));
  auto read_only = spanner::MakeReadOnlyTransaction(std::move(opts));

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  auto rows = client.ExecuteQuery(std::move(read_only), std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#

对具有指定 TimestampBound.OfExactStaleness() 值的 connection 使用 BeginReadOnlyTransactionAsync 方法来查询数据库。


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class ReadStaleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public long? MarketingBudget { get; set; }
    }

    public async Task<List<Album>> ReadStaleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        var staleness = TimestampBound.OfExactStaleness(TimeSpan.FromSeconds(15));
        using var transaction = await connection.BeginReadOnlyTransactionAsync(staleness);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, MarketingBudget FROM Albums");
        cmd.Transaction = transaction;

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

使用 Client.ReadOnlyTransaction().WithTimestampBound() 并指定 ExactStaleness 值,以使用精确过时时间戳边界读取数据库中的行。


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readStaleData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
	defer ro.Close()

	iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

该示例使用 AllKeys 来定义要读取的键或键范围的集合。

Java

对具有指定 TimestampBound.ofExactStaleness()ReadContext 使用 read 方法,通过精确过时时间戳边界读取数据库中的行。

static void readStaleData(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
          .read(
              "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          resultSet.getLong(0),
          resultSet.getLong(1),
          resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
    }
  }
}

该示例使用 KeySet 来定义要读取的键或键范围的集合。

Node.js

搭配使用 Table.readexactStaleness 选项,利用精确过时时间戳边界读取数据库中的行。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
  keySet: {
    all: true,
  },
};

const options = {
  // Guarantees that all writes committed more than 15 seconds ago are visible
  exactStaleness: 15,
};

try {
  const [rows] = await albumsTable.read(query, options);

  rows.forEach(row => {
    const json = row.toJSON();
    const id = json.SingerId;
    const album = json.AlbumId;
    const title = json.AlbumTitle;
    const budget = json.MarketingBudget ? json.MarketingBudget : '';
    console.log(
      `SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

该示例使用 keySet 来定义要读取的键或键范围的集合。

PHP

利用 Database::read 和指定的 exactStaleness 值,通过精确过时时间戳边界读取数据库中的行。

use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.  The data is exactly 15 seconds stale.
 * Guarantees that all writes committed more than 15 seconds ago are visible.
 * Example:
 * ```
 * read_stale_data
 *($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_stale_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle'],
        ['exactStaleness' => new Duration(15)]
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

该示例使用 keySet 来定义要读取的键或键范围的集合。

Python

对具有指定 exact_stalenessDatabase snapshot 使用 read 方法,通过精确过时时间戳边界读取数据库中的行。

def read_stale_data(instance_id, database_id):
    """Reads sample data from the database. The data is exactly 15 seconds
    stale."""
    import datetime

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    staleness = datetime.timedelta(seconds=15)

    with database.snapshot(exact_staleness=staleness) as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            keyset=keyset,
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))

该示例使用 KeySet 来定义要读取的键或键范围的集合。

Ruby

对具有指定 staleness 值(以秒为单位)的快照 Client 使用 read 方法,通过精确过时时间戳边界从数据库读取行。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
  snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
    puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
  end
end

使用索引执行读取

下面演示了如何使用索引从数据库读取零行或多行:

GoogleSQL

C++

使用 Read() 函数通过索引执行读取。

void ReadDataWithIndex(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  auto rows =
      client.Read("Albums", google::cloud::spanner::KeySet::All(),
                  {"AlbumId", "AlbumTitle"},
                  google::cloud::Options{}.set<spanner::ReadIndexNameOption>(
                      "AlbumsByAlbumTitle"));
  using RowType = std::tuple<std::int64_t, std::string>;
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "AlbumId: " << std::get<0>(*row) << "\t";
    std::cout << "AlbumTitle: " << std::get<1>(*row) << "\n";
  }
  std::cout << "Read completed for [spanner_read_data_with_index]\n";
}

C#

执行一个明确指定索引的查询,使用索引读取数据:


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QueryDataWithIndexAsyncSample
{
    public class Album
    {
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
        public long MarketingBudget { get; set; }
    }

    public async Task<List<Album>> QueryDataWithIndexAsync(string projectId, string instanceId, string databaseId,
        string startTitle, string endTitle)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand(
            "SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
            + "{FORCE_INDEX=AlbumsByAlbumTitle} "
            + $"WHERE AlbumTitle >= @startTitle "
            + $"AND AlbumTitle < @endTitle",
            new SpannerParameterCollection
            {
                { "startTitle", SpannerDbType.String, startTitle },
                { "endTitle", SpannerDbType.String, endTitle }
            });

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

使用 Client.Single().ReadUsingIndex,以便通过索引从数据库读取行。


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readUsingIndex(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
		[]string{"AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var albumID int64
		var albumTitle string
		if err := row.Columns(&albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
	}
}

Java

使用 ReadContext.readUsingIndex,以便通过索引从数据库读取行。

static void readUsingIndex(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .readUsingIndex(
              "Albums",
              "AlbumsByAlbumTitle",
              KeySet.all(),
              Arrays.asList("AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
    }
  }
}

Node.js

使用 Table.read 并在查询中指定索引,以便通过索引从数据库读取行。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Imports the Google Cloud Spanner client library
const {Spanner} = require('@google-cloud/spanner');

// Instantiates a client
const spanner = new Spanner({
  projectId: projectId,
});

async function readDataWithIndex() {
  // Gets a reference to a Cloud Spanner instance and database
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);

  const albumsTable = database.table('Albums');

  const query = {
    columns: ['AlbumId', 'AlbumTitle'],
    keySet: {
      all: true,
    },
    index: 'AlbumsByAlbumTitle',
  };

  // Reads the Albums table using an index
  try {
    const [rows] = await albumsTable.read(query);

    rows.forEach(row => {
      const json = row.toJSON();
      console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
    });
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    // Close the database when finished.
    database.close();
  }
}
readDataWithIndex();

PHP

使用 Database::read 并指定索引,以便通过索引从数据库读取行。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database using an index.
 *
 * The index must exist before running this sample. You can add the index
 * by running the `add_index` sample or by running this DDL statement against
 * your database:
 *
 *     CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
 *
 * Example:
 * ```
 * read_data_with_index($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data_with_index(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['AlbumId', 'AlbumTitle'],
        ['index' => 'AlbumsByAlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

使用 Database.read 并指定索引,以便通过索引从数据库读取行。

def read_data_with_index(instance_id, database_id):
    """Reads sample data from the database using an index.

    The index must exist before running this sample. You can add the index
    by running the `add_index` sample or by running this DDL statement against
    your database:

        CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)

    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("AlbumId", "AlbumTitle"),
            keyset=keyset,
            index="AlbumsByAlbumTitle",
        )

        for row in results:
            print("AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

使用 Client#read 并指定索引,以便通过索引从数据库读取行。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

result = client.read "Albums", [:AlbumId, :AlbumTitle],
                     index: "AlbumsByAlbumTitle"

result.rows.each do |row|
  puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end

并行读取数据

在从 Spanner 执行涉及大量数据的批量读取或查询操作时,您可以使用 PartitionQuery API 来更快地获取结果。该 API 通过使用多个机器并行提取分区,将查询划分为多个批次(或称“分区”)。请注意,使用 PartitionQuery API 会导致较长的延迟时间,因为它仅适用于批量操作(如导出或扫描整个数据库)。

您可以使用 Spanner 客户端库并行执行任何读取 API 操作。但是,只有当查询可进行根分区时,您才能对 SQL 查询进行分区。如需使查询可进行根分区,查询计划必须满足以下条件之一:

  • 查询执行计划中的第一个运算符是分布式联合运算符,该查询执行计划仅包含一个分布式联合运算符(不包括“分布式联合”)。您的查询计划不能包含任何其他分布式运算符,例如分布式交叉应用

  • 查询计划中没有分布式运算符。

PartitionQuery API 以批量模式运行查询。Spanner 可能会选择一个查询执行计划,使查询在批处理模式下运行时可进行根分区。因此,PartitionQuery API 和 Spanner Studio 可能会对同一查询使用不同的查询执行计划。您可能无法在 Spanner Studio 上获取 PartitionQuery API 使用的查询执行计划。

对于此类分区查询,您可以选择启用 Spanner Data Boost。借助 Data Boost,您可以运行大型分析查询,而对已预配的 Spanner 实例上的现有工作负载的影响几乎为零。本页中的 C++、Go、Java、Node.js 和 Python 代码示例展示了如何启用 Data Boost。

如需详细了解 Data Boost,请参阅 Data Boost 概览

GoogleSQL

C++

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 批处理事务。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。
void UsePartitionQuery(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto txn = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, FirstName, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string, std::string>;

  auto partitions = client.PartitionQuery(
      std::move(txn), std::move(select),
      google::cloud::Options{}.set<spanner::PartitionDataBoostOption>(true));
  if (!partitions) throw std::move(partitions).status();

  // You would probably choose to execute these partitioned queries in
  // separate threads/processes, or on a different machine.
  int number_of_rows = 0;
  for (auto const& partition : *partitions) {
    auto rows = client.ExecuteQuery(partition);
    for (auto& row : spanner::StreamOf<RowType>(rows)) {
      if (!row) throw std::move(row).status();
      number_of_rows++;
    }
  }
  std::cout << "Number of partitions: " << partitions->size() << "\n"
            << "Number of rows: " << number_of_rows << "\n";
  std::cout << "Read completed for [spanner_batch_client]\n";
}

C#

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 批处理事务。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。

using Google.Cloud.Spanner.Data;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class BatchReadRecordsAsyncSample
{
    private int _rowsRead;
    private int _partitionCount;
    public async Task<(int RowsRead, int Partitions)> BatchReadRecordsAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var transaction = await connection.BeginReadOnlyTransactionAsync();
        transaction.DisposeBehavior = DisposeBehavior.CloseResources;
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, FirstName, LastName FROM Singers");
        cmd.Transaction = transaction;

        // A CommandPartition object is serializable and can be used from a different process.
        // If data boost is enabled, partitioned read and query requests will be executed
        // using Spanner independent compute resources.
        var partitions = await cmd.GetReaderPartitionsAsync(PartitionOptions.Default.WithDataBoostEnabled(true));

        var transactionId = transaction.TransactionId;
        await Task.WhenAll(partitions.Select(x => DistributedReadWorkerAsync(x, transactionId)));
        Console.WriteLine($"Done reading!  Total rows read: {_rowsRead:N0} with {_partitionCount} partition(s)");
        return (RowsRead: _rowsRead, Partitions: _partitionCount);
    }

    private async Task DistributedReadWorkerAsync(CommandPartition readPartition, TransactionId id)
    {
        var localId = Interlocked.Increment(ref _partitionCount);
        using var connection = new SpannerConnection(id.ConnectionString);
        using var transaction = connection.BeginReadOnlyTransaction(id);
        using var cmd = connection.CreateCommandWithPartition(readPartition, transaction);
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            Interlocked.Increment(ref _rowsRead);
            Console.WriteLine($"Partition ({localId}) "
                + $"{reader.GetFieldValue<int>("SingerId")}"
                + $" {reader.GetFieldValue<string>("FirstName")}"
                + $" {reader.GetFieldValue<string>("LastName")}");
        }
        Console.WriteLine($"Done with single reader {localId}.");
    }
}

Go

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 客户端和事务。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readBatchData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
	if err != nil {
		return err
	}
	defer txn.Close()

	// Singer represents a row in the Singers table.
	type Singer struct {
		SingerID   int64
		FirstName  string
		LastName   string
		SingerInfo []byte
	}
	stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
	// A Partition object is serializable and can be used from a different process.
	// DataBoost option is an optional parameter which can also be used for partition read
	// and query to execute the request via spanner independent compute resources.
	partitions, err := txn.PartitionQueryWithOptions(ctx, stmt, spanner.PartitionOptions{}, spanner.QueryOptions{DataBoostEnabled: true})
	if err != nil {
		return err
	}
	recordCount := 0
	for i, p := range partitions {
		iter := txn.Execute(ctx, p)
		defer iter.Stop()
		for {
			row, err := iter.Next()
			if err == iterator.Done {
				break
			} else if err != nil {
				return err
			}
			var s Singer
			if err := row.ToStruct(&s); err != nil {
				return err
			}
			fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
			recordCount++
		}
	}
	fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
	fmt.Fprintf(w, "Total record count: %v\n", recordCount)
	return nil
}

Java

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 批处理客户端和事务。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);

try {
  BatchClient batchClient =
      spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));

  final BatchReadOnlyTransaction txn =
      batchClient.batchReadOnlyTransaction(TimestampBound.strong());

  // A Partition object is serializable and can be used from a different process.
  // DataBoost option is an optional parameter which can be used for partition read
  // and query to execute the request via spanner independent compute resources.

  List<Partition> partitions =
      txn.partitionQuery(
          PartitionOptions.getDefaultInstance(),
          Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"),
          // Option to enable data boost for a given request
          Options.dataBoostEnabled(true));

  totalPartitions = partitions.size();

  for (final Partition p : partitions) {
    executor.execute(
        () -> {
          try (ResultSet results = txn.execute(p)) {
            while (results.next()) {
              long singerId = results.getLong(0);
              String firstName = results.getString(1);
              String lastName = results.getString(2);
              System.out.println("[" + singerId + "] " + firstName + " " + lastName);
              totalRecords.getAndIncrement();
            }
          }
        });
  }
} finally {
  executor.shutdown();
  executor.awaitTermination(1, TimeUnit.HOURS);
  spanner.close();
}

double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
  avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);

Node.js

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建一个 Spanner 客户端和一个批次。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const [transaction] = await database.createBatchTransaction();

const query = {
  sql: 'SELECT * FROM Singers',
  // DataBoost option is an optional parameter which can also be used for partition read
  // and query to execute the request via spanner independent compute resources.
  dataBoostEnabled: true,
};

// A Partition object is serializable and can be used from a different process.
const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);

let row_count = 0;
const promises = [];
partitions.forEach(partition => {
  promises.push(
    transaction.execute(partition).then(results => {
      const rows = results[0].map(row => row.toJSON());
      row_count += rows.length;
    })
  );
});
Promise.all(promises)
  .then(() => {
    console.log(
      `Successfully received ${row_count} from executed partitions.`
    );
    transaction.close();
  })
  .then(() => {
    database.close();
  });

PHP

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建一个 Spanner 客户端和一个批次。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。
use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * batch_query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function batch_query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $batch = $spanner->batch($instanceId, $databaseId);
    $snapshot = $batch->snapshot();
    $queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
    $partitions = $snapshot->partitionQuery($queryString, [
        // This is an optional parameter which can be used for partition
        // read and query to execute the request via spanner independent
        // compute resources.
        'dataBoostEnabled' => true
    ]);
    $totalPartitions = count($partitions);
    $totalRecords = 0;
    foreach ($partitions as $partition) {
        $result = $snapshot->executePartition($partition);
        $rows = $result->rows();
        foreach ($rows as $row) {
            $singerId = $row['SingerId'];
            $firstName = $row['FirstName'];
            $lastName = $row['LastName'];
            printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
            $totalRecords++;
        }
    }
    printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
    printf('Total Records: %d' . PHP_EOL, $totalRecords);
    $averageRecordsPerPartition = $totalRecords / $totalPartitions;
    printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}

Python

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 客户端和批处理事务。
  • 为查询生成分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。

def run_batch_query(instance_id, database_id):
    """Runs an example batch query."""

    # Expected Table Format:
    # CREATE TABLE Singers (
    #   SingerId   INT64 NOT NULL,
    #   FirstName  STRING(1024),
    #   LastName   STRING(1024),
    #   SingerInfo BYTES(MAX),
    # ) PRIMARY KEY (SingerId);

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Create the batch transaction and generate partitions
    snapshot = database.batch_snapshot()
    partitions = snapshot.generate_read_batches(
        table="Singers",
        columns=("SingerId", "FirstName", "LastName"),
        keyset=spanner.KeySet(all_=True),
        # A Partition object is serializable and can be used from a different process.
        # DataBoost option is an optional parameter which can also be used for partition read
        # and query to execute the request via spanner independent compute resources.
        data_boost_enabled=True,
    )

    # Create a pool of workers for the tasks
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, snapshot, p) for p in partitions]

        for future in concurrent.futures.as_completed(futures, timeout=3600):
            finish, row_ct = future.result()
            elapsed = finish - start
            print("Completed {} rows in {} seconds".format(row_ct, elapsed))

    # Clean up
    snapshot.close()

def process(snapshot, partition):
    """Processes the requests of a query in an separate process."""
    print("Started processing partition.")
    row_ct = 0
    for row in snapshot.process_read_batch(partition):
        print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
        row_ct += 1
    return time.time(), row_ct

Ruby

此示例提取 Singers 表的 SQL 查询分区,并按照以下步骤对每个分区执行查询:

  • 创建 Spanner 批处理客户端。
  • 为查询创建分区,以便可以将分区分发给多个工作器。
  • 检索每个分区的查询结果。
# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

# Prepare a thread pool with number of processors
processor_count  = Concurrent.processor_count
thread_pool      = Concurrent::FixedThreadPool.new processor_count

# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new

# Create a new Spanner batch client
spanner        = Google::Cloud::Spanner.new project: project_id
batch_client   = spanner.batch_client instance_id, database_id

# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true

# Get partitions for specified query
# data_boost_enabled option is an optional parameter which can be used for partition read
# and query to execute the request via spanner independent compute resources.
partitions       = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers", data_boost_enabled: true
total_partitions = partitions.size

# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
  thread_pool.post do
    # Increment total_records per new row
    batch_snapshot.execute_partition(partition).rows.each do |_row|
      total_records.increment
    end
  end
end

# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination

# Close the client connection and release resources.
batch_snapshot.close

# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
  average_records_per_partition = total_records.value / total_partitions.to_f
end

puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"