트랜잭션 외부에서 읽기

이 페이지에서는 읽기 전용 및 읽기-쓰기 트랜잭션의 컨텍스트 외부에서 Spanner의 읽기를 수행하는 방법을 설명합니다. 다음 중 하나에 해당하면 트랜잭션 페이지를 대신 읽어야 합니다.

  • 하나 이상의 읽기 값에 따라 쓰기를 실행해야 하는 경우, 읽기-쓰기 트랜잭션의 일부로 읽기를 실행해야 합니다. 자세한 내용은 읽기-쓰기 트랜잭션을 참조하세요.

  • 일관된 데이터 보기가 필요한 여러 읽기 호출을 수행하는 경우, 읽기 전용 트랜잭션의 일부로 읽기를 실행해야 합니다. 자세한 내용은 읽기 전용 트랜잭션을 참조하세요.

읽기 유형

Spanner는 두 가지 읽기 유형을 제공하므로 데이터를 읽을 때 데이터의 현재 상태를 확인할 수 있습니다.

  • 강력 읽기는 현재 타임스탬프에서의 읽기이며 이 읽기를 시작할 때까지 커밋한 모든 데이터를 볼 수 있습니다. Spanner는 기본적으로 강력 읽기를 사용하여 읽기 요청을 처리합니다.
  • 비활성 읽기는 과거의 타임스탬프에서의 읽기입니다. 애플리케이션이 지연 시간에 민감하지만 비활성 데이터를 허용하는 경우, 비활성 읽기는 성능상의 이점이 있습니다.

원하는 읽기 유형을 선택하려면 읽기 요청에서 타임스탬프 경계를 설정합니다. 타임스탬프 경계를 선택할 때 다음 권장사항을 따르세요.

  • 가능하다면 강력 읽기를 선택합니다. 이 유형은 읽기 전용 트랜잭션을 포함하는 Spanner 읽기의 기본 타임스탬프 경계입니다. 강력 읽기에서는 읽기를 수신하는 복제본에 관계없이 작업을 시작하기 전에 커밋한 모든 트랜잭션의 영향을 관찰할 수 있습니다. 이러한 이유로 강력 읽기를 사용하면 애플리케이션 코드가 보다 간단해지고 애플리케이션의 신뢰성을 높일 수 있습니다. TrueTime 및 외부 일관성에서 Spanner의 일관성 속성을 자세히 알아보세요.

  • 상황에 따라 지연 시간 때문에 강력 읽기를 실행할 수 없다면 비활성 읽기(제한된 비활성 또는 완전 비활성)를 사용하여 최신 상태의 읽기가 필요하지 않은 경우에 성능을 향상시킵니다. 복제 페이지의 설명대로 우수한 성능을 위해 사용할 수 있는 적정한 비활성 값은 15초입니다.

데이터베이스 역할로 데이터 읽기

세분화된 액세스 제어 사용자는 SQL 문과 쿼리를 실행하고 데이터베이스에서 행 작업을 수행할 수 있는 데이터베이스 역할을 선택해야 합니다. 선택한 역할은 역할을 변경할 때까지 세션 동안 유지됩니다.

데이터베이스 역할로 읽기를 수행하는 방법은 세분화된 액세스 제어로 데이터베이스에 액세스를 참조하세요.

단일 읽기 메서드

Spanner에서는 다음의 경우 데이터베이스에서 단일 읽기 메서드(즉, 트랜잭션 컨텍스트 외부에서 읽기)를 지원합니다.

  • SQL 쿼리 문으로 또는 Spanner의 읽기 API를 사용하여 읽기 실행
  • 테이블의 단일 행 또는 여러 행에서 강력 읽기 수행
  • 테이블의 단일 행 또는 여러 행에서 비활성 읽기 수행
  • 보조 색인의 단일 행 또는 여러 행에서 읽기

멀티 리전 인스턴스 구성 또는 선택적인 읽기 전용 리전이 있는 커스텀 리전 구성 내에서 특정 복제본 또는 리전으로 단일 읽기를 라우팅하려면 방향성 읽기를 참조하세요.

다음 섹션에서는 Spanner 클라이언트 라이브러리를 통해 읽기 메서드를 사용하는 방법을 설명합니다.

쿼리 실행

다음은 데이터베이스에 SQL 쿼리 구문을 실행하는 방법을 보여줍니다.

GoogleSQL

C++

ExecuteQuery()를 사용하여 데이터베이스에 SQL 쿼리 문을 실행합니다.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

ExecuteReaderAsync()를 사용하여 데이터베이스를 쿼리합니다.


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

Client.Single().Query를 사용하여 데이터베이스를 쿼리합니다.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func query(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

ReadContext.executeQuery를 사용하여 데이터베이스를 쿼리합니다.

static void query(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Node.js

Database.run를 사용하여 데이터베이스를 쿼리합니다.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const query = {
  sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};

// Queries rows from the Albums table
try {
  const [rows] = await database.run(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

PHP

Database::execute를 사용하여 데이터베이스를 쿼리합니다.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $results = $database->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );

    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

Database.execute_sql를 사용하여 데이터베이스를 쿼리합니다.

def query_data(instance_id, database_id):
    """Queries sample data from the database using SQL."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

Client#execute를 사용하여 데이터베이스를 쿼리합니다.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

SQL 문을 작성할 때 SQL 쿼리 문법함수 및 연산자 참조 자료를 찾아보세요.

강력 읽기 수행

다음은 데이터베이스에서 강력 읽기로 0개 이상의 행을 읽는 방법을 보여줍니다.

GoogleSQL

C++

데이터를 읽는 코드는 SQL 쿼리를 실행하여 Spanner를 쿼리하는 앞선 샘플과 같습니다.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

데이터를 읽는 코드는 SQL 쿼리를 실행하여 Spanner를 쿼리하는 앞선 샘플과 같습니다.


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QuerySampleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
    }

    public async Task<List<Album>> QuerySampleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        var albums = new List<Album>();
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, AlbumTitle FROM Albums");

        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle")
            });
        }
        return albums;
    }
}

Go

Client.Single().Read를 사용하여 데이터베이스의 행을 읽습니다.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func read(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
		[]string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

이 예시에서는 AllKeys를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Java

ReadContext.read를 사용하여 데이터베이스의 행을 읽습니다.

static void read(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .read(
              "Albums",
              KeySet.all(), // Read all rows in a table.
              Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

이 예시에서는 KeySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Node.js

Table.read를 사용하여 데이터베이스의 행을 읽습니다.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
};

try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

이 예시에서는 keySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

PHP

Database::read를 사용하여 데이터베이스의 행을 읽습니다.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.
 * Example:
 * ```
 * read_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

이 예시에서는 keySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Python

Database.read를 사용하여 데이터베이스의 행을 읽습니다.

def read_data(instance_id, database_id):
    """Reads sample data from the database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

이 예시에서는 KeySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Ruby

Client#read를 사용하여 데이터베이스의 행을 읽습니다.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

비활성 읽기 수행

다음 샘플 코드는 완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 비활성 읽기로 0개 이상의 행을 읽는 방법을 보여줍니다. 제한된 비활성 타임스탬프 경계를 사용하여 비활성 읽기를 수행하는 방법에 대한 자세한 내용은 샘플 코드 뒤에 나오는 참고 사항을 참조하세요. 사용할 수 있는 타임스탬프 경계의 여러 유형에 대한 자세한 내용은 타임스탬프 경계를 참조하세요.

GoogleSQL

C++

MakeReadOnlyTransaction()Transaction::ReadOnlyOptions()과 함께 ExecuteQuery()를 사용하여 비활성 읽기를 수행합니다.

void ReadStaleData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  // The timestamp chosen using the `exact_staleness` parameter is bounded
  // below by the creation time of the database, so the visible state may only
  // include that generated by the `extra_statements` executed atomically with
  // the creation of the database. Here we at least know `Albums` exists.
  auto opts = spanner::Transaction::ReadOnlyOptions(std::chrono::seconds(15));
  auto read_only = spanner::MakeReadOnlyTransaction(std::move(opts));

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  auto rows = client.ExecuteQuery(std::move(read_only), std::move(select));
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#

지정된 TimestampBound.OfExactStaleness() 값이 있는 connection에서 BeginReadOnlyTransactionAsync 메서드를 사용하여 데이터베이스를 쿼리합니다.


using Google.Cloud.Spanner.Data;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class ReadStaleDataAsyncSample
{
    public class Album
    {
        public int SingerId { get; set; }
        public int AlbumId { get; set; }
        public long? MarketingBudget { get; set; }
    }

    public async Task<List<Album>> ReadStaleDataAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";

        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        var staleness = TimestampBound.OfExactStaleness(TimeSpan.FromSeconds(15));
        using var transaction = await connection.BeginTransactionAsync(SpannerTransactionCreationOptions.ForTimestampBoundReadOnly(staleness), cancellationToken: default);
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, AlbumId, MarketingBudget FROM Albums");
        cmd.Transaction = transaction;

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                SingerId = reader.GetFieldValue<int>("SingerId"),
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

Client.ReadOnlyTransaction().WithTimestampBound()를 사용하고 ExactStaleness 값을 지정하여 완전 비활성 타임스탬프 경계를 통해 데이터베이스에서 행 읽기를 수행합니다.


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readStaleData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
	defer ro.Close()

	iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

이 예시에서는 AllKeys를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Java

완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 TimestampBound.ofExactStaleness()를 포함하는 ReadContextread 메서드를 사용합니다.

static void readStaleData(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
          .read(
              "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          resultSet.getLong(0),
          resultSet.getLong(1),
          resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
    }
  }
}

이 예시에서는 KeySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Node.js

완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 exactStaleness 옵션을 포함하는 Table.read를 사용합니다.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
  keySet: {
    all: true,
  },
};

const options = {
  // Guarantees that all writes committed more than 15000 milliseconds ago are visible
  exactStaleness: 15000,
};

try {
  const [rows] = await albumsTable.read(query, options);

  rows.forEach(row => {
    const json = row.toJSON();
    const id = json.SingerId;
    const album = json.AlbumId;
    const title = json.AlbumTitle;
    const budget = json.MarketingBudget ? json.MarketingBudget : '';
    console.log(
      `SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

이 예시에서는 keySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

PHP

완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 exactStaleness 값이 있는 Database::read를 사용합니다.

use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.  The data is exactly 15 seconds stale.
 * Guarantees that all writes committed more than 15 seconds ago are visible.
 * Example:
 * ```
 * read_stale_data
 *($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_stale_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle'],
        ['exactStaleness' => new Duration(15)]
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

이 예시에서는 keySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Python

완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 exact_staleness 값을 포함하는 Database snapshotread 메서드를 사용합니다.

def read_stale_data(instance_id, database_id):
    """Reads sample data from the database. The data is exactly 15 seconds
    stale."""
    import datetime

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    staleness = datetime.timedelta(seconds=15)

    with database.snapshot(exact_staleness=staleness) as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            keyset=keyset,
        )

        for row in results:
            print("SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))

이 예시에서는 KeySet를 사용하여 읽을 키 모음 또는 키 범위를 정의합니다.

Ruby

완전 비활성 타임스탬프 경계를 사용하여 데이터베이스에서 행 읽기를 수행하려면 지정된 staleness 값(초 단위)이 있는 스냅샷 Clientread 메서드를 사용합니다.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
  snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
    puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
  end
end

색인을 사용하여 읽기 수행

다음은 색인을 사용하여 데이터베이스에서 0개 이상의 행을 읽는 방법을 보여줍니다.

GoogleSQL

C++

Read() 함수를 사용하여 색인을 사용한 읽기를 수행합니다.

void ReadDataWithIndex(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  auto rows =
      client.Read("Albums", google::cloud::spanner::KeySet::All(),
                  {"AlbumId", "AlbumTitle"},
                  google::cloud::Options{}.set<spanner::ReadIndexNameOption>(
                      "AlbumsByAlbumTitle"));
  using RowType = std::tuple<std::int64_t, std::string>;
  for (auto& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::move(row).status();
    std::cout << "AlbumId: " << std::get<0>(*row) << "\t";
    std::cout << "AlbumTitle: " << std::get<1>(*row) << "\n";
  }
  std::cout << "Read completed for [spanner_read_data_with_index]\n";
}

C#

색인을 사용하여 데이터를 읽으려면 명시적으로 색인을 지정하는 쿼리를 실행합니다.


using Google.Cloud.Spanner.Data;
using System.Collections.Generic;
using System.Threading.Tasks;

public class QueryDataWithIndexAsyncSample
{
    public class Album
    {
        public int AlbumId { get; set; }
        public string AlbumTitle { get; set; }
        public long MarketingBudget { get; set; }
    }

    public async Task<List<Album>> QueryDataWithIndexAsync(string projectId, string instanceId, string databaseId,
        string startTitle, string endTitle)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        using var cmd = connection.CreateSelectCommand(
            "SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
            + "{FORCE_INDEX=AlbumsByAlbumTitle} "
            + $"WHERE AlbumTitle >= @startTitle "
            + $"AND AlbumTitle < @endTitle",
            new SpannerParameterCollection
            {
                { "startTitle", SpannerDbType.String, startTitle },
                { "endTitle", SpannerDbType.String, endTitle }
            });

        var albums = new List<Album>();
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            albums.Add(new Album
            {
                AlbumId = reader.GetFieldValue<int>("AlbumId"),
                AlbumTitle = reader.GetFieldValue<string>("AlbumTitle"),
                MarketingBudget = reader.IsDBNull(reader.GetOrdinal("MarketingBudget")) ? 0 : reader.GetFieldValue<long>("MarketingBudget")
            });
        }
        return albums;
    }
}

Go

색인을 사용하여 데이터베이스에서 행을 읽으려면 Client.Single().ReadUsingIndex를 사용합니다.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readUsingIndex(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
		[]string{"AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var albumID int64
		var albumTitle string
		if err := row.Columns(&albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
	}
}

Java

색인을 사용하여 데이터베이스에서 행을 읽으려면 ReadContext.readUsingIndex를 사용합니다.

static void readUsingIndex(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .readUsingIndex(
              "Albums",
              "AlbumsByAlbumTitle",
              KeySet.all(),
              Arrays.asList("AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
    }
  }
}

Node.js

색인을 사용하여 데이터베이스에서 행을 읽으려면 Table.read를 사용하여 쿼리에서 색인을 지정합니다.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Imports the Google Cloud Spanner client library
const {Spanner} = require('@google-cloud/spanner');

// Instantiates a client
const spanner = new Spanner({
  projectId: projectId,
});

async function readDataWithIndex() {
  // Gets a reference to a Cloud Spanner instance and database
  const instance = spanner.instance(instanceId);
  const database = instance.database(databaseId);

  const albumsTable = database.table('Albums');

  const query = {
    columns: ['AlbumId', 'AlbumTitle'],
    keySet: {
      all: true,
    },
    index: 'AlbumsByAlbumTitle',
  };

  // Reads the Albums table using an index
  try {
    const [rows] = await albumsTable.read(query);

    rows.forEach(row => {
      const json = row.toJSON();
      console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
    });
  } catch (err) {
    console.error('ERROR:', err);
  } finally {
    // Close the database when finished.
    database.close();
  }
}
readDataWithIndex();

PHP

색인을 사용하여 데이터베이스에서 행을 읽으려면 Database::read를 사용하여 색인을 지정합니다.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database using an index.
 *
 * The index must exist before running this sample. You can add the index
 * by running the `add_index` sample or by running this DDL statement against
 * your database:
 *
 *     CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
 *
 * Example:
 * ```
 * read_data_with_index($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data_with_index(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['AlbumId', 'AlbumTitle'],
        ['index' => 'AlbumsByAlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

색인을 사용하여 데이터베이스에서 행을 읽으려면 Database.read를 사용하여 색인을 지정합니다.

def read_data_with_index(instance_id, database_id):
    """Reads sample data from the database using an index.

    The index must exist before running this sample. You can add the index
    by running the `add_index` sample or by running this DDL statement against
    your database:

        CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)

    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("AlbumId", "AlbumTitle"),
            keyset=keyset,
            index="AlbumsByAlbumTitle",
        )

        for row in results:
            print("AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

색인을 사용하여 데이터베이스에서 행을 읽으려면 Client#read를 사용하여 색인을 지정합니다.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

result = client.read "Albums", [:AlbumId, :AlbumTitle],
                     index: "AlbumsByAlbumTitle"

result.rows.each do |row|
  puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end

동시에 데이터 읽기

Spanner에서 대규모 데이터가 포함된 대량 읽기 작업이나 쿼리 작업을 수행할 때 PartitionQuery API를 사용하면 결과를 빨리 얻을 수 있습니다. API는 파티션을 병렬로 가져오기 위해 머신 여러 개를 사용하여 쿼리를 파티션이라고 하는 배치로 분할합니다. PartitionQuery API는 전체 데이터베이스 내보내기 또는 스캔과 같은 대량 작업 전용으로 사용되기 때문에 이 API를 사용하면 지연 시간이 길어집니다.

Spanner 클라이언트 라이브러리를 사용하여 모든 읽기 API 작업을 동시에 수행할 수 있습니다. 하지만 쿼리에서 루트 분할이 가능한 경우에만 SQL 쿼리의 파티션을 나눌 수 있습니다. 쿼리를 루트로 분할할 수 있으려면 쿼리 계획이 다음 조건 중 하나를 충족해야 합니다.

  • 쿼리 실행 계획의 첫 번째 연산자는 분산 통합이며 쿼리 실행 계획에는 분산 통합(로컬 배포 통합 제외) 하나만 포함됩니다. 쿼리 계획에는 분산 교차 적용과 같은 다른 분산 연산자가 포함될 수 없습니다.

  • 쿼리 계획에는 분산 연산자가 없습니다.

PartitionQuery API는 쿼리를 일괄 모드에서 실행합니다. Spanner는 일괄 모드에서 실행할 때 쿼리를 루트로 분할할 수 있게 해주는 쿼리 실행 계획을 선택할 수 있습니다. 따라서 PartitionQuery API 및 Spanner 스튜디오는 같은 쿼리에 서로 다른 쿼리 실행 계획을 사용할 수 있습니다. Spanner 스튜디오의 PartitionQuery API에서 사용하는 쿼리 실행 계획을 가져오지 못할 수 있습니다.

이와 같이 파티션을 나눈 쿼리의 경우 Spanner Data Boost를 사용 설정할 수 있습니다. Data Boost를 사용하면 프로비저닝된 Spanner 인스턴스의 기존 워크로드에 거의 영향을 주지 않고 대규모 분석 쿼리를 실행할 수 있습니다. 이 페이지의 C++, Go, Java, Node.js, Python 코드 예시에서는 Data Boost를 사용 설정하는 방법을 보여줍니다.

Data Boost에 대한 자세한 내용은 Data Boost 개요를 참고하세요.

GoogleSQL

C++

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 일괄 트랜잭션 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색
void UsePartitionQuery(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto txn = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, FirstName, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string, std::string>;

  auto partitions = client.PartitionQuery(
      std::move(txn), std::move(select),
      google::cloud::Options{}.set<spanner::PartitionDataBoostOption>(true));
  if (!partitions) throw std::move(partitions).status();

  // You would probably choose to execute these partitioned queries in
  // separate threads/processes, or on a different machine.
  int number_of_rows = 0;
  for (auto const& partition : *partitions) {
    auto rows = client.ExecuteQuery(partition);
    for (auto& row : spanner::StreamOf<RowType>(rows)) {
      if (!row) throw std::move(row).status();
      number_of_rows++;
    }
  }
  std::cout << "Number of partitions: " << partitions->size() << "\n"
            << "Number of rows: " << number_of_rows << "\n";
  std::cout << "Read completed for [spanner_batch_client]\n";
}

C#

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 일괄 트랜잭션 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색

using Google.Cloud.Spanner.Data;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class BatchReadRecordsAsyncSample
{
    private int _rowsRead;
    private int _partitionCount;
    public async Task<(int RowsRead, int Partitions)> BatchReadRecordsAsync(string projectId, string instanceId, string databaseId)
    {
        string connectionString = $"Data Source=projects/{projectId}/instances/{instanceId}/databases/{databaseId}";
        using var connection = new SpannerConnection(connectionString);
        await connection.OpenAsync();

        using var transaction = await connection.BeginTransactionAsync(SpannerTransactionCreationOptions.ReadOnly.WithIsDetached(true), cancellationToken: default);
        transaction.DisposeBehavior = DisposeBehavior.CloseResources;
        using var cmd = connection.CreateSelectCommand("SELECT SingerId, FirstName, LastName FROM Singers");
        cmd.Transaction = transaction;

        // A CommandPartition object is serializable and can be used from a different process.
        // If data boost is enabled, partitioned read and query requests will be executed
        // using Spanner independent compute resources.
        var partitions = await cmd.GetReaderPartitionsAsync(PartitionOptions.Default.WithDataBoostEnabled(true));

        var transactionId = transaction.TransactionId;
        await Task.WhenAll(partitions.Select(x => DistributedReadWorkerAsync(x, transactionId)));
        Console.WriteLine($"Done reading!  Total rows read: {_rowsRead:N0} with {_partitionCount} partition(s)");
        return (RowsRead: _rowsRead, Partitions: _partitionCount);
    }

    private async Task DistributedReadWorkerAsync(CommandPartition readPartition, TransactionId id)
    {
        var localId = Interlocked.Increment(ref _partitionCount);
        using var connection = new SpannerConnection(id.ConnectionString);
        using var transaction = await connection.BeginTransactionAsync(SpannerTransactionCreationOptions.FromReadOnlyTransactionId(id), cancellationToken: default);
        using var cmd = connection.CreateCommandWithPartition(readPartition, transaction);
        using var reader = await cmd.ExecuteReaderAsync();
        while (await reader.ReadAsync())
        {
            Interlocked.Increment(ref _rowsRead);
            Console.WriteLine($"Partition ({localId}) "
                + $"{reader.GetFieldValue<int>("SingerId")}"
                + $" {reader.GetFieldValue<string>("FirstName")}"
                + $" {reader.GetFieldValue<string>("LastName")}");
        }
        Console.WriteLine($"Done with single reader {localId}.");
    }
}

Go

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 클라이언트 및 트랜잭션 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readBatchData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
	if err != nil {
		return err
	}
	defer txn.Close()

	// Singer represents a row in the Singers table.
	type Singer struct {
		SingerID   int64
		FirstName  string
		LastName   string
		SingerInfo []byte
	}
	stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
	// A Partition object is serializable and can be used from a different process.
	// DataBoost option is an optional parameter which can also be used for partition read
	// and query to execute the request via spanner independent compute resources.
	partitions, err := txn.PartitionQueryWithOptions(ctx, stmt, spanner.PartitionOptions{}, spanner.QueryOptions{DataBoostEnabled: true})
	if err != nil {
		return err
	}
	recordCount := 0
	for i, p := range partitions {
		iter := txn.Execute(ctx, p)
		defer iter.Stop()
		for {
			row, err := iter.Next()
			if err == iterator.Done {
				break
			} else if err != nil {
				return err
			}
			var s Singer
			if err := row.ToStruct(&s); err != nil {
				return err
			}
			fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
			recordCount++
		}
	}
	fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
	fmt.Fprintf(w, "Total record count: %v\n", recordCount)
	return nil
}

Java

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 일괄 클라이언트 및 트랜잭션 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);

try {
  BatchClient batchClient =
      spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));

  final BatchReadOnlyTransaction txn =
      batchClient.batchReadOnlyTransaction(TimestampBound.strong());

  // A Partition object is serializable and can be used from a different process.
  // DataBoost option is an optional parameter which can be used for partition read
  // and query to execute the request via spanner independent compute resources.

  List<Partition> partitions =
      txn.partitionQuery(
          PartitionOptions.getDefaultInstance(),
          Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"),
          // Option to enable data boost for a given request
          Options.dataBoostEnabled(true));

  totalPartitions = partitions.size();

  for (final Partition p : partitions) {
    executor.execute(
        () -> {
          try (ResultSet results = txn.execute(p)) {
            while (results.next()) {
              long singerId = results.getLong(0);
              String firstName = results.getString(1);
              String lastName = results.getString(2);
              System.out.println("[" + singerId + "] " + firstName + " " + lastName);
              totalRecords.getAndIncrement();
            }
          }
        });
  }
} finally {
  executor.shutdown();
  executor.awaitTermination(1, TimeUnit.HOURS);
  spanner.close();
}

double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
  avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);

Node.js

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 클라이언트 및 배치 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const [transaction] = await database.createBatchTransaction();

const query = {
  sql: 'SELECT * FROM Singers',
  // DataBoost option is an optional parameter which can also be used for partition read
  // and query to execute the request via spanner independent compute resources.
  dataBoostEnabled: true,
};

// A Partition object is serializable and can be used from a different process.
const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);

let row_count = 0;
const promises = [];
partitions.forEach(partition => {
  promises.push(
    transaction.execute(partition).then(results => {
      const rows = results[0].map(row => row.toJSON());
      row_count += rows.length;
    })
  );
});
Promise.all(promises)
  .then(() => {
    console.log(
      `Successfully received ${row_count} from executed partitions.`
    );
    transaction.close();
  })
  .then(() => {
    database.close();
  });

PHP

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 클라이언트 및 배치 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색
use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * batch_query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function batch_query_data(string $instanceId, string $databaseId): void
{
    $spanner = new SpannerClient();
    $batch = $spanner->batch($instanceId, $databaseId);
    $snapshot = $batch->snapshot();
    $queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
    $partitions = $snapshot->partitionQuery($queryString, [
        // This is an optional parameter which can be used for partition
        // read and query to execute the request via spanner independent
        // compute resources.
        'dataBoostEnabled' => true
    ]);
    $totalPartitions = count($partitions);
    $totalRecords = 0;
    foreach ($partitions as $partition) {
        $result = $snapshot->executePartition($partition);
        $rows = $result->rows();
        foreach ($rows as $row) {
            $singerId = $row['SingerId'];
            $firstName = $row['FirstName'];
            $lastName = $row['LastName'];
            printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
            $totalRecords++;
        }
    }
    printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
    printf('Total Records: %d' . PHP_EOL, $totalRecords);
    $averageRecordsPerPartition = $totalRecords / $totalPartitions;
    printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}

Python

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 클라이언트 및 일괄 트랜잭션 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 생성
  • 각 파티션의 쿼리 결과 검색

def run_batch_query(instance_id, database_id):
    """Runs an example batch query."""

    # Expected Table Format:
    # CREATE TABLE Singers (
    #   SingerId   INT64 NOT NULL,
    #   FirstName  STRING(1024),
    #   LastName   STRING(1024),
    #   SingerInfo BYTES(MAX),
    # ) PRIMARY KEY (SingerId);

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Create the batch transaction and generate partitions
    snapshot = database.batch_snapshot()
    partitions = snapshot.generate_read_batches(
        table="Singers",
        columns=("SingerId", "FirstName", "LastName"),
        keyset=spanner.KeySet(all_=True),
        # A Partition object is serializable and can be used from a different process.
        # DataBoost option is an optional parameter which can also be used for partition read
        # and query to execute the request via spanner independent compute resources.
        data_boost_enabled=True,
    )

    # Create a pool of workers for the tasks
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, snapshot, p) for p in partitions]

        for future in concurrent.futures.as_completed(futures, timeout=3600):
            finish, row_ct = future.result()
            elapsed = finish - start
            print("Completed {} rows in {} seconds".format(row_ct, elapsed))

    # Clean up
    snapshot.close()


def process(snapshot, partition):
    """Processes the requests of a query in an separate process."""
    print("Started processing partition.")
    row_ct = 0
    for row in snapshot.process_read_batch(partition):
        print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
        row_ct += 1
    return time.time(), row_ct

Ruby

이 예시에서는 Singers 테이블에서 SQL 쿼리의 파티션을 가져오고 다음 단계를 통해 각 파티션에서 쿼리를 실행합니다.

  • Spanner 일괄 클라이언트 만들기
  • 파티션을 여러 작업자에게 분산할 수 있도록 쿼리의 파티션 만들기
  • 각 파티션의 쿼리 결과 검색
# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

# Prepare a thread pool with number of processors
processor_count  = Concurrent.processor_count
thread_pool      = Concurrent::FixedThreadPool.new processor_count

# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new

# Create a new Spanner batch client
spanner        = Google::Cloud::Spanner.new project: project_id
batch_client   = spanner.batch_client instance_id, database_id

# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true

# Get partitions for specified query
# data_boost_enabled option is an optional parameter which can be used for partition read
# and query to execute the request via spanner independent compute resources.
partitions       = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers", data_boost_enabled: true
total_partitions = partitions.size

# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
  thread_pool.post do
    # Increment total_records per new row
    batch_snapshot.execute_partition(partition).rows.each do |_row|
      total_records.increment
    end
  end
end

# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination

# Close the client connection and release resources.
batch_snapshot.close

# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
  average_records_per_partition = total_records.value / total_partitions.to_f
end

puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"