読み取り

このページでは、読み取り専用トランザクションと読み書きトランザクションのコンテキスト外で Cloud Spanner データの読み取りを行う方法を説明します。次のいずれかに該当する場合には、トランザクション ページをご覧ください。

  • 1 つ以上の読み取り結果に応じて書き込みを行う必要がある場合、読み取り書き込みトランザクションの中で読み取りを実行する必要があります。詳細については、読み取り書き込みトランザクションをご覧ください。

  • データの一貫したビューを取得するため、複数の読み取り呼び出しを行う場合、読み取り専用トランザクションの中で読み取りを実行する必要があります。詳細については、読み取り専用トランザクションをご覧ください。

読み取り呼び出しが 1 回か、データを同時に読み取り、書き込みを行わない場合には、以下の説明をご覧ください。

読み取りのタイプ

Cloud Spanner には以下の 2 種類の読み取り方法が用意されており、これによってデータを読み取るときのデータの新しさを指定できます。

  • 「強力な読み取り」は、現在のタイムスタンプでの読み取りであり、この読み取りの開始時までに commit されたすべてのデータを確実に確認できます。Cloud Spanner のデフォルトでは、強力な読み取りを使用して読み取りリクエストが処理されます。
  • 「ステイル読み取り」は、過去のタイムスタンプによる読み取りです。レイテンシの影響を受けやすいものの古いデータは許容できるアプリケーションの場合、ステイル読み取りを使用するとパフォーマンスが向上することがあります。

タイムスタンプ バウンドの選択

使用する読み取りのタイプを選択するには、読み取りリクエストでタイムスタンプ バウンドを設定します。タイムスタンプ バウンドを選択する際は、次のおすすめの方法を使用してください。

  • 可能な限り、強力な読み取りを選択します。これは Cloud Spanner の読み取り(読み取り専用トランザクションを含む)の、デフォルトのタイムスタンプ バウンドです。強力な読み取りでは、どのレプリカが読み取りを受け取るかに関係なく、読み取りの開始前に commit されたすべてのトランザクションの結果を確実に確認できます。このため強力な読み取りでは、アプリケーション コードが簡略化され、アプリケーションの信頼性も向上します。Cloud Spanner の整合性プロパティについては、TrueTime と外部整合性をご覧ください。

  • レイテンシのために強力な読み取りが不可能な場合は、ステイル読み取りを使用します(bounded-staleness または exact-staleness)。読み取り結果をできるだけ新しいものにする必要がない場合は、これによってパフォーマンスを向上させることができます。Cloud Spanner でのレプリケーションでしているように、良好なパフォーマンスを実現するために、ステイルネスの値として 15 秒を使用することは妥当です。

単一読み取りメソッド

Cloud Spanner では、次の場合にデータベースに対する単一読み取りメソッド(トランザクションのコンテキスト外での読み取りなど)を使用できます。

  • SQL クエリ文として読み取りを実行する。あるいは、Cloud Spanner の読み取り API を使用する。
  • 強力な読み取りを実行してテーブルから単一行または複数行を読み取る。
  • ステイル読み取りを実行してテーブルから単一行または複数行を読み取る。
  • セカンダリ インデックスの単一行または複数行を読み取る。

以下では、Cloud Spanner API の Cloud クライアント ライブラリを使用して読み取りを行う方法について説明します。

クエリを実行する

以下では、データベースに SQL クエリを実行する方法を説明します。

C#

ExecuteReaderAsync() を使用して、データベースをクエリします。

string connectionString =
$"Data Source=projects/{projectId}/instances/"
+ $"{instanceId}/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            Console.WriteLine("SingerId : "
                + reader.GetFieldValue<string>("SingerId")
                + " AlbumId : "
                + reader.GetFieldValue<string>("AlbumId")
                + " AlbumTitle : "
                + reader.GetFieldValue<string>("AlbumTitle"));
        }
    }
}

Go

Client.Single().Query を使用して、データベースをクエリします。

func query(ctx context.Context, w io.Writer, client *spanner.Client) error {
	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

ReadContext.executeQuery を使用して、データベースをクエリします。

static void query(DatabaseClient dbClient) {
  // We use a try-with-resource block to automatically release resources held by ResultSet.
  try (ResultSet resultSet = dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Node.js

Database.run を使用して、データベースをクエリします。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const query = {
  sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};

// Queries rows from the Albums table
try {
  const [rows] = await database.run(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

PHP

Database::execute を使用して、データベースをクエリします。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function query_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $results = $database->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );

    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

Database.execute_sql を使用して、データベースをクエリします。

def query_data(instance_id, database_id):
    """Queries sample data from the database using SQL."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            'SELECT SingerId, AlbumId, AlbumTitle FROM Albums')

        for row in results:
            print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row))

Ruby

Client#execute を使用して、データベースをクエリします。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

SQL ステートメントを作成するときは、SQL クエリ構文関数と演算子のリファレンスをご覧ください。

強力な読み取りの実行

以下では、強力な読み取りを実行してデータベースから 0 行以上を読み取る方法を説明します。

C#

データを読み取るためのコードは、SQL クエリを実行して Cloud Spanner をクエリする前述のサンプルと同じです。

string connectionString =
$"Data Source=projects/{projectId}/instances/"
+ $"{instanceId}/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            Console.WriteLine("SingerId : "
                + reader.GetFieldValue<string>("SingerId")
                + " AlbumId : "
                + reader.GetFieldValue<string>("AlbumId")
                + " AlbumTitle : "
                + reader.GetFieldValue<string>("AlbumTitle"));
        }
    }
}

Go

Client.Single().Read を使用して、データベースから行を読み取ります。

func read(ctx context.Context, w io.Writer, client *spanner.Client) error {
	iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
		[]string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

この例では AllKeys を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Java

ReadContext.read を使用して、データベースから行を読み込みます。

static void read(DatabaseClient dbClient) {
  // We use a try-with-resource block to automatically release resources held by ResultSet.
  try (ResultSet resultSet = dbClient
          .singleUse()
          .read(
              "Albums",
              KeySet.all(), // Read all rows in a table.
              Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

この例では KeySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Node.js

Table.read を使用して、データベースから行を読み込みます。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
};

try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

この例では keySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

PHP

Database::read を使用して、データベースから行を読み込みます。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.
 * Example:
 * ```
 * read_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

この例では keySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Python

Database.read を使用して、データベースから行を読み込みます。

def read_data(instance_id, database_id):
    """Reads sample data from the database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table='Albums',
            columns=('SingerId', 'AlbumId', 'AlbumTitle',),
            keyset=keyset,)

        for row in results:
            print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row))

この例では KeySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Ruby

Client#read を使用して、データベースから行を読み取ります。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

ステイル読み取りの実行

次のサンプルコードは、exact-staleness タイムスタンプ バウンドを使用してステイル読み取りを実行し、データベースから 0 行以上を読み取る方法を示しています。bounded-staleness タイムスタンプ バウンドを使用してステイル読み取りを実行する方法については、サンプルコードの後の注を参照してください。利用できるタイムスタンプ バウンドのタイプに関する詳細については、タイムスタンプ バウンドをご覧ください。

C#

BeginReadOnlyTransactionAsync メソッドを connection で(指定した TimestampBound.OfExactStaleness() の値とともに)使用して、データベースをクエリします。

string connectionString =
    $"Data Source=projects/{projectId}/instances/{instanceId}"
    + $"/databases/{databaseId}";

// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    await connection.OpenAsync();

    // Open a new read only transaction.
    var staleness = TimestampBound.OfExactStaleness(
        TimeSpan.FromSeconds(15));
    using (var transaction =
        await connection.BeginReadOnlyTransactionAsync(staleness))
    {
        var cmd = connection.CreateSelectCommand(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
        cmd.Transaction = transaction;

        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                Console.WriteLine("SingerId : "
                    + reader.GetFieldValue<string>("SingerId")
                    + " AlbumId : "
                    + reader.GetFieldValue<string>("AlbumId")
                    + " AlbumTitle : "
                    + reader.GetFieldValue<string>("AlbumTitle"));
            }
        }
    }
}

Go

Client.ReadOnlyTransaction().WithTimestampBound() を使用して ExactStaleness 値を指定し、exact-staleness タイムスタンプ バウンドを使用してデータベースから行を読み取ります。

func readStaleData(ctx context.Context, w io.Writer, client *spanner.Client) error {
	ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
	defer ro.Close()

	iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

この例では AllKeys を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Java

TimestampBound.ofExactStaleness() を指定した ReadContextread メソッドを使用し、exact-staleness タイムスタンプ バウンドを使用してデータベースから行の読み取りを実行します。

static void readStaleData(DatabaseClient dbClient) {
  // We use a try-with-resource block to automatically release resources held by ResultSet.
  try (ResultSet resultSet = dbClient
          .singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
          .read(
            "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          resultSet.getLong(0),
          resultSet.getLong(1),
          resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
    }
  }
}

この例では KeySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Node.js

exact-staleness タイムスタンプ バウンドを利用してデータベースから行の読み取りを実行するには、Table.readexactStaleness オプションとともに使用します。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
  keySet: {
    all: true,
  },
};

const options = {
  // Guarantees that all writes committed more than 15 seconds ago are visible
  exactStaleness: 15,
};

try {
  const [rows] = await albumsTable.read(query, options);

  rows.forEach(row => {
    const json = row.toJSON();
    const id = json.SingerId;
    const album = json.AlbumId;
    const title = json.AlbumTitle;
    const budget = json.MarketingBudget ? json.MarketingBudget : '';
    console.log(
      `SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

この例では keySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

PHP

exact-staleness タイムスタンプ バウンドを利用してデータベースから行の読み取りを実行するには、Database::read を使用し、exactStaleness 値を指定します。

use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.  The data is exactly 15 seconds stale.
 * Guarantees that all writes committed more than 15 seconds ago are visible.
 * Example:
 * ```
 * read_stale_data
 *($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_stale_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle'],
        ['exactStaleness' => new Duration(15)]
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

この例では keySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Python

exact-staleness タイムスタンプ バウンドを利用してデータベースから行の読み取りを実行するには、exact_staleness 値を指定した Database snapshotread メソッドを使用します。

def read_stale_data(instance_id, database_id):
    """Reads sample data from the database. The data is exactly 15 seconds
    stale."""
    import datetime

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    staleness = datetime.timedelta(seconds=15)

    with database.snapshot(exact_staleness=staleness) as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table='Albums',
            columns=('SingerId', 'AlbumId', 'MarketingBudget',),
            keyset=keyset)

        for row in results:
            print(u'SingerId: {}, AlbumId: {}, MarketingBudget: {}'.format(
                *row))

この例では KeySet を使用して、読み取る対象となるキーやキー範囲のコレクションを定義しています。

Ruby

exact-staleness タイムスタンプ バウンドを利用してデータベースから行の読み取りを実行するには、staleness 値(秒単位)を指定したスナップショット Clientread メソッドを使用します。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
  snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
    puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
  end
end

インデックスを使用して読み取りを実行する

以下では、インデックスを使用してデータベースから 0 行以上を読み取ります。

C#

インデックスを使用してデータを読み取るには、インデックスを明示的に指定するクエリを実行します。

string connectionString =
$"Data Source=projects/{projectId}/instances/{instanceId}"
+ $"/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
        + "{FORCE_INDEX=AlbumsByAlbumTitle} "
        + $"WHERE AlbumTitle >= @startTitle "
        + $"AND AlbumTitle < @endTitle",
        new SpannerParameterCollection {
            {"startTitle", SpannerDbType.String},
            {"endTitle", SpannerDbType.String} });
    cmd.Parameters["startTitle"].Value = startTitle;
    cmd.Parameters["endTitle"].Value = endTitle;
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            Console.WriteLine("AlbumId : "
            + reader.GetFieldValue<string>("AlbumId")
            + " AlbumTitle : "
            + reader.GetFieldValue<string>("AlbumTitle")
            + " MarketingBudget : "
            + reader.GetFieldValue<string>("MarketingBudget"));
        }
    }
}

Go

Client.Single().ReadUsingIndex とインデックスを使用して、データベースから行を読み取ります。

func readUsingIndex(ctx context.Context, w io.Writer, client *spanner.Client) error {
	iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
		[]string{"AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var albumID int64
		var albumTitle string
		if err := row.Columns(&albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
	}
}

Java

ReadContext.readUsingIndex とインデックスを使用して、データベースから行を読み取ります。

static void readUsingIndex(DatabaseClient dbClient) {
  // We use a try-with-resource block to automatically release resources held by ResultSet.
  try (ResultSet resultSet = dbClient
          .singleUse()
          .readUsingIndex(
              "Albums",
              "AlbumsByAlbumTitle",
              KeySet.all(),
              Arrays.asList("AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
    }
  }
}

Node.js

Table.read を使用してクエリでインデックスを指定し、データベースから行を読み取ります。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const albumsTable = database.table('Albums');

const query = {
  columns: ['AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
  index: 'AlbumsByAlbumTitle',
};

// Reads the Albums table using an index
try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

Database::read を使用してインデックスを指定し、データベースから行を読み取ります。

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database using an index.
 *
 * The index must exist before running this sample. You can add the index
 * by running the `add_index` sample or by running this DDL statement against
 * your database:
 *
 *     CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
 *
 * Example:
 * ```
 * read_data_with_index($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data_with_index($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['AlbumId', 'AlbumTitle'],
        ['index' => 'AlbumsByAlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

Database.read を使用してインデックスを指定し、データベースから行を読み取ります。

def read_data_with_index(instance_id, database_id):
    """Reads sample data from the database using an index.

    The index must exist before running this sample. You can add the index
    by running the `add_index` sample or by running this DDL statement against
    your database:

        CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)

    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table='Albums',
            columns=('AlbumId', 'AlbumTitle'),
            keyset=keyset,
            index='AlbumsByAlbumTitle')

        for row in results:
            print('AlbumId: {}, AlbumTitle: {}'.format(*row))

Ruby

Client#read を使用してインデックスを指定し、データベースから行を読み取ります。

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

result = client.read "Albums", [:AlbumId, :AlbumTitle],
                     index: "AlbumsByAlbumTitle"

result.rows.each do |row|
  puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end

データを同時に読み込む

Cloud Spanner から大量のデータを読み込んだり、クエリを実行したりする際は、クエリをより小さな部分またはパーティションに分割し、複数のマシンを使用してパーティションを同時にフェッチしたほうが良い場合があります。

読み取り API オペレーションを同時に行うには、Cloud Spanner クライアント ライブラリを使用します。ただし、SQL クエリを分割できるのは、クエリ実行プランの最初の演算子が分散ユニオンの場合のみです。特定の SQL クエリのクエリ実行プランを確認するには、SQL のおすすめの方法の手順に従います。

クエリ実行プランの作成後、プランの最初の演算子が分散ユニオンであることを確認します。

C#

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner バッチ トランザクションを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを生成する。
  • 各パーティションのクエリ結果を取得する。
private static int s_partitionId;
private static int s_rowsRead;
public static object BatchReadRecords(string projectId,
     string instanceId, string databaseId)
{
    var responseTask =
        DistributedReadAsync(projectId, instanceId, databaseId);
    Console.WriteLine("Waiting for operation to complete...");
    responseTask.Wait();
    Console.WriteLine($"Operation status: {responseTask.Status}");
    return ExitCode.Success;
}

private static async Task DistributedReadAsync(string projectId,
    string instanceId, string databaseId)
{
    string connectionString =
        $"Data Source=projects/{projectId}/instances/{instanceId}"
        + $"/databases/{databaseId}";
    using (var connection = new SpannerConnection(connectionString))
    {
        await connection.OpenAsync();

        using (var transaction =
            await connection.BeginReadOnlyTransactionAsync())
        using (var cmd =
            connection.CreateSelectCommand(
                "SELECT SingerId, FirstName, LastName FROM Singers"))
        {
            transaction.DisposeBehavior =
                DisposeBehavior.CloseResources;
            cmd.Transaction = transaction;
            var partitions = await cmd.GetReaderPartitionsAsync();
            var transactionId = transaction.TransactionId;
            await Task.WhenAll(partitions.Select(
                    x => DistributedReadWorkerAsync(x, transactionId)))
                        .ConfigureAwait(false);
        }
        Console.WriteLine($"Done reading!  Total rows read: "
            + $"{s_rowsRead:N0} with {s_partitionId} partition(s)");
    }
}

private static async Task DistributedReadWorkerAsync(
    CommandPartition readPartition, TransactionId id)
{
    var localId = Interlocked.Increment(ref s_partitionId);
    using (var connection = new SpannerConnection(id.ConnectionString))
    using (var transaction = connection.BeginReadOnlyTransaction(id))
    {
        using (var cmd = connection.CreateCommandWithPartition(
            readPartition, transaction))
        {
            using (var reader =
                await cmd.ExecuteReaderAsync().ConfigureAwait(false))
            {
                while (await reader.ReadAsync())
                {
                    Interlocked.Increment(ref s_rowsRead);
                    Console.WriteLine($"Partition ({localId}) "
                        + $"{reader.GetFieldValue<string>("SingerId")}"
                        + $" {reader.GetFieldValue<string>("FirstName")}"
                        + $" {reader.GetFieldValue<string>("LastName")}");
                }
            }
        }
        Console.WriteLine($"Done with single reader {localId}.");
    }
}

Go

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner クライアントとトランザクションを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを生成する。
  • 各パーティションのクエリ結果を取得する。
func readBatchData(ctx context.Context, w io.Writer, client *spanner.Client) error {
	txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
	if err != nil {
		return err
	}
	defer txn.Close()

	// Singer represents a row in the Singers table.
	type Singer struct {
		SingerID   int64
		FirstName  string
		LastName   string
		SingerInfo []byte
	}
	stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
	partitions, err := txn.PartitionQuery(ctx, stmt, spanner.PartitionOptions{})
	if err != nil {
		return err
	}
	recordCount := 0
	for i, p := range partitions {
		iter := txn.Execute(ctx, p)
		defer iter.Stop()
		for {
			row, err := iter.Next()
			if err == iterator.Done {
				break
			} else if err != nil {
				return err
			}
			var s Singer
			if err := row.ToStruct(&s); err != nil {
				return err
			}
			fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
			recordCount++
		}
	}
	fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
	fmt.Fprintf(w, "Total record count: %v\n", recordCount)
	return nil
}

Java

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner バッチ クライアントとトランザクションを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを生成する。
  • 各パーティションのクエリ結果を取得する。
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);

try {
  BatchClient batchClient = spanner.getBatchClient(
      DatabaseId.of(options.getProjectId(), instanceId, databaseId));

  final BatchReadOnlyTransaction txn =
      batchClient.batchReadOnlyTransaction(TimestampBound.strong());

  // A Partition object is serializable and can be used from a different process.
  List<Partition> partitions = txn.partitionQuery(PartitionOptions.getDefaultInstance(),
      Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"));

  totalPartitions = partitions.size();

  for (final Partition p : partitions) {
    executor.execute(() -> {
      try (ResultSet results = txn.execute(p)) {
        while (results.next()) {
          long singerId = results.getLong(0);
          String firstName = results.getString(1);
          String lastName = results.getString(2);
          System.out.println("[" + singerId + "] " + firstName + " " + lastName);
          totalRecords.getAndIncrement();
        }
      }
    });
  }
} finally {
  executor.shutdown();
  executor.awaitTermination(1, TimeUnit.HOURS);
  spanner.close();
}

double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
  avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);

Node.js

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、各パーティションにクエリを実行します。

コードの次の部分でパーティションを生成します。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const identifier = {};

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const transaction = database.batchTransaction(identifier);

const query = 'SELECT * FROM Singers';

const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);

コードの次の部分で各パーティションにクエリを実行します。

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const identifier = {};
// const partition = {};

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const transaction = database.batchTransaction(identifier);

const [rows] = await transaction.execute(partition);
console.log(`Successfully received ${rows.length} from executed partition.`);

PHP

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner クライアントとバッチを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを生成する。
  • 各パーティションのクエリ結果を取得する。
use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * batch_query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function batch_query_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $batch = $spanner->batch($instanceId, $databaseId);
    $snapshot = $batch->snapshot();
    $queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
    $partitions = $snapshot->partitionQuery($queryString);
    $totalPartitions = count($partitions);
    $totalRecords = 0;
    foreach ($partitions as $partition) {
        $result = $snapshot->executePartition($partition);
        $rows = $result->rows();
        foreach ($rows as $row) {
            $singerId = $row['SingerId'];
            $firstName = $row['FirstName'];
            $lastName = $row['LastName'];
            printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
            $totalRecords++;
        }
    }
    printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
    printf('Total Records: %d' . PHP_EOL, $totalRecords);
    $averageRecordsPerPartition = $totalRecords / $totalPartitions;
    printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}

Python

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner クライアントとバッチ トランザクションを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを生成する。
  • 各パーティションのクエリ結果を取得する。

def run_batch_query(instance_id, database_id):
    """Runs an example batch query."""

    # Expected Table Format:
    # CREATE TABLE Singers (
    #   SingerId   INT64 NOT NULL,
    #   FirstName  STRING(1024),
    #   LastName   STRING(1024),
    #   SingerInfo BYTES(MAX),
    # ) PRIMARY KEY (SingerId);

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Create the batch transaction and generate partitions
    snapshot = database.batch_snapshot()
    partitions = snapshot.generate_read_batches(
        table='Singers',
        columns=('SingerId', 'FirstName', 'LastName',),
        keyset=spanner.KeySet(all_=True)
    )

    # Create a pool of workers for the tasks
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, snapshot, p) for p in partitions]

        for future in concurrent.futures.as_completed(futures, timeout=3600):
            finish, row_ct = future.result()
            elapsed = finish - start
            print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed))

    # Clean up
    snapshot.close()

def process(snapshot, partition):
    """Processes the requests of a query in an separate process."""
    print('Started processing partition.')
    row_ct = 0
    for row in snapshot.process_read_batch(partition):
        print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row))
        row_ct += 1
    return time.time(), row_ct

Ruby

この例では、Singers テーブルの SQL クエリのパーティションをフェッチし、次の手順で各パーティションにクエリを実行します。

  • Cloud Spanner バッチ クライアントを作成する。
  • パーティションを複数のワーカーに分散できるように、クエリのパーティションを作成する。
  • 各パーティションのクエリ結果を取得する。
# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

# Prepare a thread pool with number of processors
processor_count  = Concurrent.processor_count
thread_pool      = Concurrent::FixedThreadPool.new processor_count

# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new

# Create a new Spanner batch client
spanner        = Google::Cloud::Spanner.new project: project_id
batch_client   = spanner.batch_client instance_id, database_id

# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true

# Get partitions for specified query
partitions       = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers"
total_partitions = partitions.size

# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
  thread_pool.post do
    # Increment total_records per new row
    batch_snapshot.execute_partition(partition).rows.each do |_row|
      total_records.increment
    end
  end
end

# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination

# Close the client connection and release resources.
batch_snapshot.close

# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
  average_records_per_partition = total_records.value.to_f / total_partitions.to_f
end

puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Spanner のドキュメント