이 페이지에서는 읽기 전용 및 읽기-쓰기 트랜잭션의 컨텍스트 외부에서 Spanner의 읽기 작업을 수행하는 방법을 설명합니다. 따라서 다음 중 하나에 해당하는 경우 트랜잭션 페이지를 대신 읽어야 합니다.
하나 이상의 읽기 값에 따라 쓰기를 실행해야 하는 경우, 읽기-쓰기 트랜잭션의 일부로 읽기를 실행해야 합니다. 읽기-쓰기 트랜잭션에 대해 자세히 알아보세요.
일관된 데이터 보기가 필요한 여러 읽기 호출을 수행하는 경우, 읽기 전용 트랜잭션의 일부로 읽기를 실행해야 합니다. 읽기 전용 트랜잭션에 대해 자세히 알아보세요.
단일 읽기 호출을 실행하거나 동시에 데이터를 읽어야 하지만 쓰지 않아도 되는 경우에는 이 페이지를 계속 읽으세요.
읽기 유형
Spanner는 다음 두 가지 읽기 유형을 제공하므로 데이터를 읽을 때 데이터의 현재 상태를 확인할 수 있습니다.
- 강력 읽기는 현재 타임스탬프에서의 읽기이며 이 읽기를 시작할 때까지 커밋한 모든 데이터를 볼 수 있습니다. Spanner는 기본적으로 강력 읽기를 사용하여 읽기 요청을 처리합니다.
- 비활성 읽기는 과거의 타임스탬프에서의 읽기입니다. 애플리케이션이 지연 시간에 민감하지만 비활성 데이터를 허용하는 경우, 비활성 읽기는 성능상의 이점이 있습니다.
타임스탬프 경계 선택
원하는 읽기 유형을 선택하려면 읽기 요청에서 타임스탬프 경계를 설정합니다. 타임스탬프 경계를 선택할 때 다음 권장사항을 따르세요.
가능하다면 강력 읽기를 선택합니다. 이 유형은 읽기 전용 트랜잭션을 포함하는 Spanner 읽기의 기본 타임스탬프 경계입니다. 강력 읽기에서는 읽기를 수신하는 복제본에 관계없이 작업을 시작하기 전에 커밋한 모든 트랜잭션의 영향을 관찰할 수 있습니다. 이러한 이유로 강력 읽기를 사용하면 애플리케이션 코드가 보다 간단해지고 애플리케이션의 신뢰성을 높일 수 있습니다. TrueTime 및 외부 일관성에서 Spanner의 일관성 속성을 자세히 알아보세요.
상황에 따라 지연 시간 때문에 강력 읽기를 실행할 수 없다면 비활성 읽기(제한된 비활성 또는 완전 비활성)를 사용하여 최신 상태의 읽기가 필요하지 않은 경우에 성능을 향상시킵니다. Cloud Spanner 복제에 설명된 대로 우수한 성능을 위해 사용할 수 있는 적정한 비활성 값은 15초입니다.
단일 읽기 메서드
Spanner에서는 다음의 경우 데이터베이스에서 단일 읽기 메서드(즉, 트랜잭션 컨텍스트 외부에서 읽기)를 지원합니다.
- SQL 쿼리 문으로 또는 Spanner의 읽기 API를 사용하여 읽기 실행
- 테이블의 단일 행 또는 여러 행에서 강력 읽기 수행
- 테이블의 단일 행 또는 여러 행에서 비활성 읽기 수행
- 보조 색인의 단일 행 또는 여러 행에서 읽기
아래 섹션에서는 Cloud Spanner API용 Cloud 클라이언트 라이브러리로 읽기 메서드를 사용하는 방법을 설명합니다.
쿼리 실행
다음은 데이터베이스에 SQL 쿼리 구문을 실행하는 방법을 보여줍니다.
Google 표준 SQL
C++
ExecuteQuery()
를 사용하여 데이터베이스에 SQL 쿼리 문을 실행합니다.
void QueryData(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
using RowType = std::tuple<std::int64_t, std::string>;
auto rows = client.ExecuteQuery(std::move(select));
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
std::cout << "SingerId: " << std::get<0>(*row) << "\t";
std::cout << "LastName: " << std::get<1>(*row) << "\n";
}
std::cout << "Query completed for [spanner_query_data]\n";
}
C#
ExecuteReaderAsync()
를 사용하여 데이터베이스를 쿼리합니다.
using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;
public class QuerySampleDataAsyncSample
{
public class Album
{
public int SingerId { get; set; }
public int AlbumId { get; set; }
public string AlbumTitle { get; set; }
}
public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
var albums = new List<Album>();
using var connection = new SpannerConnection(connectionString);
using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
albums.Add(new Album
{
AlbumId = reader.GetFieldValue<int>("AlbumId"),
SingerId = reader.GetFieldValue<int>("SingerId"),
AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
});
}
return albums;
}
}
Go
Client.Single().Query
를 사용하여 데이터베이스를 쿼리합니다.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func query(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
iter := client.Single().Query(ctx, stmt)
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
}
}
자바
ReadContext.executeQuery
를 사용하여 데이터베이스를 쿼리합니다.
static void query(DatabaseClient dbClient) {
try (ResultSet resultSet =
dbClient
.singleUse() // Execute a single read or query against Cloud Spanner.
.executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
}
}
}
Node.js
Database.run
를 사용하여 데이터베이스를 쿼리합니다.
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const query = {
sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};
// Queries rows from the Albums table
try {
const [rows] = await database.run(query);
rows.forEach(row => {
const json = row.toJSON();
console.log(
`SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished.
await database.close();
}
PHP
Database::execute
를 사용하여 데이터베이스를 쿼리합니다.
use Google\Cloud\Spanner\SpannerClient;
/**
* Queries sample data from the database using SQL.
* Example:
* ```
* query_data($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function query_data(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$results = $database->execute(
'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
);
foreach ($results as $row) {
printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
$row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
}
}
Python
Database.execute_sql
를 사용하여 데이터베이스를 쿼리합니다.
def query_data(instance_id, database_id):
"""Queries sample data from the database using SQL."""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
)
for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
Ruby
Client#execute
를 사용하여 데이터베이스를 쿼리합니다.
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end
SQL 문을 작성할 때 SQL 쿼리 구문 및 함수 및 연산자 참조 자료를 찾아보세요.
강력 읽기 수행
다음은 데이터베이스에서 강력 읽기로 0개 이상의 행을 읽는 방법을 보여줍니다.
Google 표준 SQL
C++
데이터를 읽는 코드는 SQL 쿼리를 실행하여 Spanner를 쿼리하는 앞선 샘플과 같습니다.
void QueryData(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
using RowType = std::tuple<std::int64_t, std::string>;
auto rows = client.ExecuteQuery(std::move(select));
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
std::cout << "SingerId: " << std::get<0>(*row) << "\t";
std::cout << "LastName: " << std::get<1>(*row) << "\n";
}
std::cout << "Query completed for [spanner_query_data]\n";
}
C#
데이터를 읽는 코드는 SQL 쿼리를 실행하여 Spanner를 쿼리하는 앞선 샘플과 같습니다.
using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;
public class QuerySampleDataAsyncSample
{
public class Album
{
public int SingerId { get; set; }
public int AlbumId { get; set; }
public string AlbumTitle { get; set; }
}
public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
var albums = new List<Album>();
using var connection = new SpannerConnection(connectionString);
using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
albums.Add(new Album
{
AlbumId = reader.GetFieldValue<int>("AlbumId"),
SingerId = reader.GetFieldValue<int>("SingerId"),
AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
});
}
return albums;
}
}
Go
Client.Single().Read
를 사용하여 데이터베이스의 행을 읽습니다.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func read(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
[]string{"SingerId", "AlbumId", "AlbumTitle"})
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var singerID, albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
}
}
이 예시에서는 AllKeys
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
자바
ReadContext.read
를 사용하여 데이터베이스의 행을 읽습니다.
static void read(DatabaseClient dbClient) {
try (ResultSet resultSet =
dbClient
.singleUse()
.read(
"Albums",
KeySet.all(), // Read all rows in a table.
Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
}
}
}
이 예시에서는 KeySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Node.js
Table.read
를 사용하여 데이터베이스의 행을 읽습니다.
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Reads rows from the Albums table
const albumsTable = database.table('Albums');
const query = {
columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
keySet: {
all: true,
},
};
try {
const [rows] = await albumsTable.read(query);
rows.forEach(row => {
const json = row.toJSON();
console.log(
`SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished.
await database.close();
}
이 예시에서는 keySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
PHP
Database::read
를 사용하여 데이터베이스의 행을 읽습니다.
use Google\Cloud\Spanner\SpannerClient;
/**
* Reads sample data from the database.
* Example:
* ```
* read_data($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function read_data(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$keySet = $spanner->keySet(['all' => true]);
$results = $database->read(
'Albums',
$keySet,
['SingerId', 'AlbumId', 'AlbumTitle']
);
foreach ($results->rows() as $row) {
printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
$row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
}
}
이 예시에서는 keySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Python
Database.read
를 사용하여 데이터베이스의 행을 읽습니다.
def read_data(instance_id, database_id):
"""Reads sample data from the database."""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.snapshot() as snapshot:
keyset = spanner.KeySet(all_=True)
results = snapshot.read(
table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
)
for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
이 예시에서는 KeySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Ruby
Client#read
를 사용하여 데이터베이스의 행을 읽습니다.
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end
비활성 읽기 수행
다음 샘플 코드는 완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 비활성 읽기로 0개 이상의 행을 읽는 방법을 보여줍니다. 제한된 비활성 타임스탬프 경계를 사용하여 비활성 읽기를 수행하는 방법에 대한 자세한 내용은 샘플 코드 뒤에 나오는 참고 사항을 참조하세요. 사용할 수 있는 타임스탬프 경계의 여러 유형에 대한 자세한 내용은 타임스탬프 경계를 참조하세요.
Google 표준 SQL
C++
MakeReadOnlyTransaction()
및 Transaction::ReadOnlyOptions()
과 함께 ExecuteQuery()
를 사용하여 비활성 읽기를 수행합니다.
void ReadStaleData(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
// The timestamp chosen using the `exact_staleness` parameter is bounded
// below by the creation time of the database, so the visible state may only
// include that generated by the `extra_statements` executed atomically with
// the creation of the database. Here we at least know `Albums` exists.
auto opts = spanner::Transaction::ReadOnlyOptions(std::chrono::seconds(15));
auto read_only = spanner::MakeReadOnlyTransaction(std::move(opts));
spanner::SqlStatement select(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;
auto rows = client.ExecuteQuery(std::move(read_only), std::move(select));
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
std::cout << "SingerId: " << std::get<0>(*row)
<< " AlbumId: " << std::get<1>(*row)
<< " AlbumTitle: " << std::get<2>(*row) << "\n";
}
}
C#
지정된 TimestampBound.OfExactStaleness()
값이 있는 connection
에서 BeginReadOnlyTransactionAsync
메서드를 사용하여 데이터베이스를 쿼리합니다.
using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class ReadStaleDataAsyncSample
{
public class Album
{
public int SingerId { get; set; }
public int AlbumId { get; set; }
public long? MarketingBudget { get; set; }
}
public async Task<List<Album>> ReadStaleDataAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
using var connection = new SpannerConnection(connectionString);
await connection.OpenAsync();
var staleness = TimestampBound.OfExactStaleness(TimeSpan.FromSeconds(15));
using var transaction = await connection.BeginReadOnlyTransactionAsync(staleness);
using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, MarketingBudget FROM Albums");
cmd.Transaction = transaction;
var albums = new List<Album>();
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
albums.Add(new Album
{
SingerId = reader.GetFieldValue<int>("SingerId"),
AlbumId = reader.GetFieldValue<int>("AlbumId"),
MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
});
}
return albums;
}
}
Go
Client.ReadOnlyTransaction().WithTimestampBound()
를 사용하고 ExactStaleness
값을 지정하여 완전 비활성 타임스탬프 경계를 통해 데이터베이스에서 행 읽기를 수행합니다.
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func readStaleData(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
defer ro.Close()
iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var singerID int64
var albumID int64
var albumTitle string
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
}
}
이 예시에서는 AllKeys
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
자바
완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 TimestampBound.ofExactStaleness()
를 포함하는 ReadContext
의 read
메서드를 사용합니다.
static void readStaleData(DatabaseClient dbClient) {
try (ResultSet resultSet =
dbClient
.singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
.read(
"Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
while (resultSet.next()) {
System.out.printf(
"%d %d %s\n",
resultSet.getLong(0),
resultSet.getLong(1),
resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
}
}
}
이 예시에서는 KeySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Node.js
완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 exactStaleness
옵션을 포함하는 Table.read
를 사용합니다.
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
// Reads rows from the Albums table
const albumsTable = database.table('Albums');
const query = {
columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
keySet: {
all: true,
},
};
const options = {
// Guarantees that all writes committed more than 15 seconds ago are visible
exactStaleness: 15,
};
try {
const [rows] = await albumsTable.read(query, options);
rows.forEach(row => {
const json = row.toJSON();
const id = json.SingerId;
const album = json.AlbumId;
const title = json.AlbumTitle;
const budget = json.MarketingBudget ? json.MarketingBudget : '';
console.log(
`SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`
);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished.
await database.close();
}
이 예시에서는 keySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
PHP
완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 exactStaleness
값이 있는 Database::read
를 사용합니다.
use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\SpannerClient;
/**
* Reads sample data from the database. The data is exactly 15 seconds stale.
* Guarantees that all writes committed more than 15 seconds ago are visible.
* Example:
* ```
* read_stale_data
*($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function read_stale_data(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$keySet = $spanner->keySet(['all' => true]);
$results = $database->read(
'Albums',
$keySet,
['SingerId', 'AlbumId', 'AlbumTitle'],
['exactStaleness' => new Duration(15)]
);
foreach ($results->rows() as $row) {
printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
$row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
}
}
이 예시에서는 keySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Python
완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 exact_staleness
값을 포함하는 Database
snapshot
의 read
메서드를 사용합니다.
def read_stale_data(instance_id, database_id):
"""Reads sample data from the database. The data is exactly 15 seconds
stale."""
import datetime
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
staleness = datetime.timedelta(seconds=15)
with database.snapshot(exact_staleness=staleness) as snapshot:
keyset = spanner.KeySet(all_=True)
results = snapshot.read(
table="Albums",
columns=("SingerId", "AlbumId", "MarketingBudget"),
keyset=keyset,
)
for row in results:
print("SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))
이 예시에서는 KeySet
를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.
Ruby
완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 staleness
값(초 단위)이 있는 스냅샷 Client
의 read
메서드를 사용합니다.
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end
end
색인을 사용하여 읽기 수행
다음은 색인을 사용하여 데이터베이스에서 0개 이상의 행을 읽는 방법을 보여줍니다.
Google 표준 SQL
C++
Read()
함수를 사용하여 색인을 사용한 읽기를 수행합니다.
void ReadDataWithIndex(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
auto rows =
client.Read("Albums", google::cloud::spanner::KeySet::All(),
{"AlbumId", "AlbumTitle"},
google::cloud::Options{}.set<spanner::ReadIndexNameOption>(
"AlbumsByAlbumTitle"));
using RowType = std::tuple<std::int64_t, std::string>;
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
std::cout << "AlbumId: " << std::get<0>(*row) << "\t";
std::cout << "AlbumTitle: " << std::get<1>(*row) << "\n";
}
std::cout << "Read completed for [spanner_read_data_with_index]\n";
}
C#
색인을 사용하여 데이터를 읽으려면 명시적으로 색인을 지정하는 쿼리를 실행합니다.
using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;
public class QueryDataWithIndexAsyncSample
{
public class Album
{
public int AlbumId { get; set; }
public string AlbumTitle { get; set; }
public long MarketingBudget { get; set; }
}
public async Task<List<Album>> QueryDataWithIndexAsync(string projectId, string instanceId, string databaseId,
string startTitle, string endTitle)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
using var connection = new SpannerConnection(connectionString);
using var cmd = connection.CreateSelectCommand(
"SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
+ "{FORCE_INDEX=AlbumsByAlbumTitle} "
+ $"WHERE AlbumTitle >= @startTitle "
+ $"AND AlbumTitle < @endTitle",
new SpannerParameterCollection
{
{ "startTitle", SpannerDbType.String, startTitle },
{ "endTitle", SpannerDbType.String, endTitle }
});
var albums = new List<Album>();
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
albums.Add(new Album
{
AlbumId = reader.GetFieldValue<int>("AlbumId"),
AlbumTitle = reader.GetFieldValue<string>("AlbumTitle"),
MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
});
}
return albums;
}
}
Go
색인을 사용하여 데이터베이스에서 행을 읽으려면 Client.Single().ReadUsingIndex
를 사용합니다.
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func readUsingIndex(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
[]string{"AlbumId", "AlbumTitle"})
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
var albumID int64
var albumTitle string
if err := row.Columns(&albumID, &albumTitle); err != nil {
return err
}
fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
}
}
자바
색인을 사용하여 데이터베이스에서 행을 읽으려면 ReadContext.readUsingIndex
를 사용합니다.
static void readUsingIndex(DatabaseClient dbClient) {
try (ResultSet resultSet =
dbClient
.singleUse()
.readUsingIndex(
"Albums",
"AlbumsByAlbumTitle",
KeySet.all(),
Arrays.asList("AlbumId", "AlbumTitle"))) {
while (resultSet.next()) {
System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
}
}
}
Node.js
색인을 사용하여 데이터베이스에서 행을 읽으려면 Table.read
를 사용하여 쿼리에서 색인을 지정합니다.
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';
// Imports the Google Cloud Spanner client library
const {Spanner} = require('@google-cloud/spanner');
// Instantiates a client
const spanner = new Spanner({
projectId: projectId,
});
async function readDataWithIndex() {
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const albumsTable = database.table('Albums');
const query = {
columns: ['AlbumId', 'AlbumTitle'],
keySet: {
all: true,
},
index: 'AlbumsByAlbumTitle',
};
// Reads the Albums table using an index
try {
const [rows] = await albumsTable.read(query);
rows.forEach(row => {
const json = row.toJSON();
console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
});
} catch (err) {
console.error('ERROR:', err);
} finally {
// Close the database when finished.
database.close();
}
}
readDataWithIndex();
PHP
색인을 사용하여 데이터베이스에서 행을 읽으려면 Database::read
를 사용하여 색인을 지정합니다.
use Google\Cloud\Spanner\SpannerClient;
/**
* Reads sample data from the database using an index.
*
* The index must exist before running this sample. You can add the index
* by running the `add_index` sample or by running this DDL statement against
* your database:
*
* CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
*
* Example:
* ```
* read_data_with_index($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function read_data_with_index(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$instance = $spanner->instance($instanceId);
$database = $instance->database($databaseId);
$keySet = $spanner->keySet(['all' => true]);
$results = $database->read(
'Albums',
$keySet,
['AlbumId', 'AlbumTitle'],
['index' => 'AlbumsByAlbumTitle']
);
foreach ($results->rows() as $row) {
printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
$row['AlbumId'], $row['AlbumTitle']);
}
}
Python
색인을 사용하여 데이터베이스에서 행을 읽으려면 Database.read
를 사용하여 색인을 지정합니다.
def read_data_with_index(instance_id, database_id):
"""Reads sample data from the database using an index.
The index must exist before running this sample. You can add the index
by running the `add_index` sample or by running this DDL statement against
your database:
CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
"""
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
with database.snapshot() as snapshot:
keyset = spanner.KeySet(all_=True)
results = snapshot.read(
table="Albums",
columns=("AlbumId", "AlbumTitle"),
keyset=keyset,
index="AlbumsByAlbumTitle",
)
for row in results:
print("AlbumId: {}, AlbumTitle: {}".format(*row))
Ruby
색인을 사용하여 데이터베이스에서 행을 읽으려면 Client#read
를 사용하여 색인을 지정합니다.
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
spanner = Google::Cloud::Spanner.new project: project_id
client = spanner.client instance_id, database_id
result = client.read "Albums", [:AlbumId, :AlbumTitle],
index: "AlbumsByAlbumTitle"
result.rows.each do |row|
puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end
동시에 데이터 읽기
Spanner에서 대규모 데이터가 포함된 대량 읽기 작업이나 쿼리 작업을 수행할 때 PartitionQuery API를 사용하면 결과를 빨리 얻을 수 있습니다. API는 파티션을 병렬로 가져오기 위해 여러 머신을 사용해서 쿼리를 파티션이라고 부르는 작은 부분으로 분할합니다. PartitionQuery API는 전체 데이터베이스 내보내기 또는 스캔과 같은 대량 작업 전용으로 사용되기 때문에 이 API를 사용하면 대기 시간이 더 높습니다.
Spanner 클라이언트 라이브러리를 사용하여 모든 읽기 API 작업을 동시에 수행할 수 있습니다. 단, 쿼리 실행 계획의 첫 번째 연산자가 분산 통합인 SQL 쿼리만 파티션을 나눌 수 있습니다. 특정 SQL 쿼리의 쿼리 실행 계획을 보려면 SQL 권장사항의 안내에 따릅니다.
쿼리 실행 계획이 표시되면 이 계획의 첫 번째 연산자가 분산 통합인지 확인합니다.
Google 표준 SQL
C++
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 일괄 트랜잭션 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
void UsePartitionQuery(google::cloud::spanner::Client client) {
namespace spanner = ::google::cloud::spanner;
auto txn = spanner::MakeReadOnlyTransaction();
spanner::SqlStatement select(
"SELECT SingerId, FirstName, LastName FROM Singers");
using RowType = std::tuple<std::int64_t, std::string, std::string>;
auto partitions =
client.PartitionQuery(std::move(txn), std::move(select), {});
if (!partitions) throw std::move(partitions).status();
int number_of_rows = 0;
for (auto const& partition : *partitions) {
auto rows = client.ExecuteQuery(partition);
for (auto& row : spanner::StreamOf<RowType>(rows)) {
if (!row) throw std::move(row).status();
number_of_rows++;
}
}
std::cout << "Number of partitions: " << partitions->size() << "\n"
<< "Number of rows: " << number_of_rows << "\n";
std::cout << "Read completed for [spanner_batch_client]\n";
}
C#
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 일괄 트랜잭션 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
using Google.Cloud.Spanner.Data;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class BatchReadRecordsAsyncSample
{
private int _rowsRead;
private int _partitionCount;
public async Task<(int RowsRead, int Partitions)> BatchReadRecordsAsync(string projectId, string instanceId, string databaseId)
{
string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
using var connection = new SpannerConnection(connectionString);
await connection.OpenAsync();
using var transaction = await connection.BeginReadOnlyTransactionAsync();
transaction.DisposeBehavior = DisposeBehavior.CloseResources;
using var cmd = connection.CreateSelectCommand("SELECT SingerId, FirstName, LastName FROM Singers");
cmd.Transaction = transaction;
var partitions = await cmd.GetReaderPartitionsAsync();
var transactionId = transaction.TransactionId;
await Task.WhenAll(partitions.Select(x => DistributedReadWorkerAsync(x, transactionId)));
Console.WriteLine($"Done reading! Total rows read: {_rowsRead:N0} with {_partitionCount} partition(s)");
return (RowsRead: _rowsRead, Partitions: _partitionCount);
}
private async Task DistributedReadWorkerAsync(CommandPartition readPartition, TransactionId id)
{
var localId = Interlocked.Increment(ref _partitionCount);
using var connection = new SpannerConnection(id.ConnectionString);
using var transaction = connection.BeginReadOnlyTransaction(id);
using var cmd = connection.CreateCommandWithPartition(readPartition, transaction);
using var reader = await cmd.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
Interlocked.Increment(ref _rowsRead);
Console.WriteLine($"Partition ({localId}) "
+ $"{reader.GetFieldValue<int>("SingerId")}"
+ $" {reader.GetFieldValue<string>("FirstName")}"
+ $" {reader.GetFieldValue<string>("LastName")}");
}
Console.WriteLine($"Done with single reader {localId}.");
}
}
Go
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 클라이언트 및 트랜잭션 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
import (
"context"
"fmt"
"io"
"cloud.google.com/go/spanner"
"google.golang.org/api/iterator"
)
func readBatchData(w io.Writer, db string) error {
ctx := context.Background()
client, err := spanner.NewClient(ctx, db)
if err != nil {
return err
}
defer client.Close()
txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
if err != nil {
return err
}
defer txn.Close()
// Singer represents a row in the Singers table.
type Singer struct {
SingerID int64
FirstName string
LastName string
SingerInfo []byte
}
stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
partitions, err := txn.PartitionQuery(ctx, stmt, spanner.PartitionOptions{})
if err != nil {
return err
}
recordCount := 0
for i, p := range partitions {
iter := txn.Execute(ctx, p)
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
} else if err != nil {
return err
}
var s Singer
if err := row.ToStruct(&s); err != nil {
return err
}
fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
recordCount++
}
}
fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
fmt.Fprintf(w, "Total record count: %v\n", recordCount)
return nil
}
자바
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 일괄 클라이언트 및 트랜잭션 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);
try {
BatchClient batchClient =
spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));
final BatchReadOnlyTransaction txn =
batchClient.batchReadOnlyTransaction(TimestampBound.strong());
// A Partition object is serializable and can be used from a different process.
List<Partition> partitions =
txn.partitionQuery(
PartitionOptions.getDefaultInstance(),
Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"));
totalPartitions = partitions.size();
for (final Partition p : partitions) {
executor.execute(
() -> {
try (ResultSet results = txn.execute(p)) {
while (results.next()) {
long singerId = results.getLong(0);
String firstName = results.getString(1);
String lastName = results.getString(2);
System.out.println("[" + singerId + "] " + firstName + " " + lastName);
totalRecords.getAndIncrement();
}
}
});
}
} finally {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
spanner.close();
}
double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);
Node.js
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 클라이언트 및 배치 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');
/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// Creates a client
const spanner = new Spanner({
projectId: projectId,
});
// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const [transaction] = await database.createBatchTransaction();
const query = 'SELECT * FROM Singers';
const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);
let row_count = 0;
const promises = [];
partitions.forEach(partition => {
promises.push(
transaction.execute(partition).then(results => {
const rows = results[0].map(row => row.toJSON());
row_count += rows.length;
})
);
});
Promise.all(promises)
.then(() => {
console.log(
`Successfully received ${row_count} from executed partitions.`
);
transaction.close();
})
.then(() => {
database.close();
});
PHP
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 클라이언트 및 배치 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
use Google\Cloud\Spanner\SpannerClient;
/**
* Queries sample data from the database using SQL.
* Example:
* ```
* batch_query_data($instanceId, $databaseId);
* ```
*
* @param string $instanceId The Spanner instance ID.
* @param string $databaseId The Spanner database ID.
*/
function batch_query_data(string $instanceId, string $databaseId): void
{
$spanner = new SpannerClient();
$batch = $spanner->batch($instanceId, $databaseId);
$snapshot = $batch->snapshot();
$queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
$partitions = $snapshot->partitionQuery($queryString);
$totalPartitions = count($partitions);
$totalRecords = 0;
foreach ($partitions as $partition) {
$result = $snapshot->executePartition($partition);
$rows = $result->rows();
foreach ($rows as $row) {
$singerId = $row['SingerId'];
$firstName = $row['FirstName'];
$lastName = $row['LastName'];
printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
$totalRecords++;
}
}
printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
printf('Total Records: %d' . PHP_EOL, $totalRecords);
$averageRecordsPerPartition = $totalRecords / $totalPartitions;
printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}
Python
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 클라이언트 및 일괄 트랜잭션 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
- 각 파티션의 쿼리 결과 검색
def run_batch_query(instance_id, database_id):
"""Runs an example batch query."""
# Expected Table Format:
# CREATE TABLE Singers (
# SingerId INT64 NOT NULL,
# FirstName STRING(1024),
# LastName STRING(1024),
# SingerInfo BYTES(MAX),
# ) PRIMARY KEY (SingerId);
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)
# Create the batch transaction and generate partitions
snapshot = database.batch_snapshot()
partitions = snapshot.generate_read_batches(
table="Singers",
columns=("SingerId", "FirstName", "LastName"),
keyset=spanner.KeySet(all_=True),
)
# Create a pool of workers for the tasks
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process, snapshot, p) for p in partitions]
for future in concurrent.futures.as_completed(futures, timeout=3600):
finish, row_ct = future.result()
elapsed = finish - start
print("Completed {} rows in {} seconds".format(row_ct, elapsed))
# Clean up
snapshot.close()
def process(snapshot, partition):
"""Processes the requests of a query in an separate process."""
print("Started processing partition.")
row_ct = 0
for row in snapshot.process_read_batch(partition):
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
row_ct += 1
return time.time(), row_ct
Ruby
이 예시에서는 Singers
테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.
- Spanner 일괄 클라이언트 만들기
- 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 만들기
- 각 파티션의 쿼리 결과 검색
# project_id = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"
# Prepare a thread pool with number of processors
processor_count = Concurrent.processor_count
thread_pool = Concurrent::FixedThreadPool.new processor_count
# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new
# Create a new Spanner batch client
spanner = Google::Cloud::Spanner.new project: project_id
batch_client = spanner.batch_client instance_id, database_id
# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true
# Get partitions for specified query
partitions = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers"
total_partitions = partitions.size
# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
thread_pool.post do
# Increment total_records per new row
batch_snapshot.execute_partition(partition).rows.each do |_row|
total_records.increment
end
end
end
# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination
# Close the client connection and release resources.
batch_snapshot.close
# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
average_records_per_partition = total_records.value / total_partitions.to_f
end
puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"