Lesevorgänge

Auf dieser Seite wird beschrieben, wie Sie in Cloud Spanner Lesevorgänge außerhalb des Kontexts von schreibgeschützten Transaktionen und Lese-Schreib-Transaktionen ausführen. Wenn also eine der folgenden Situationen zutrifft, lesen Sie stattdessen die Seite "Transaktionen":

Wenn Sie einen einzigen Leseaufruf ausführen oder gleichzeitig Daten schreiben müssen, lesen Sie weiter.

Lesetypen

Cloud Spanner bietet Ihnen zwei Typen von Lesevorgängen, um festzulegen, wie aktuell die Daten beim Lesen sein sollen:

  • Ein starker Lesevorgang ist ein Lesevorgang zu einem aktuellen Zeitstempel, der garantiert alle Daten erfasst, die bis zum Beginn dieses Lesevorgangs mit einem Commit festgeschrieben wurden. Cloud Spanner verwendet standardmäßig starke Lesevorgänge, um Leseanfragen zu bedienen.
  • Ein veralteter Lesevorgang erfasst Daten an einem Zeitpunkt (Zeitstempel) in der Vergangenheit. Wenn Ihre Anwendung latenzabhängig ist, aber veraltete Daten toleriert, können veraltete Lesevorgänge Leistungsvorteile bieten.

Zeitstempelgrenzen auswählen

Legen Sie eine Zeitstempelgrenze für die Leseanforderung fest, um die gewünschte Art von Lesevorgang auszuwählen. Gehen Sie bei der Auswahl einer Zeitstempelgrenze gemäß den folgenden Best Practices vor:

  • Wählen Sie nach Möglichkeit starke Lesevorgänge. Dies ist die standardmäßige Zeitstempelgrenze für Cloud Spanner-Lesevorgänge, einschließlich schreibgeschützter Transaktionen. Starke Lesevorgänge berücksichtigen garantiert die Auswirkungen aller Transaktionen, die vor dem Start des Lesevorgangs durchgeführt wurden, unabhängig davon, bei welchem Replikat der Lesevorgang eingeht. Aus diesem Grund machen starke Lesevorgänge Anwendungscode einfacher und Anwendungen vertrauenswürdiger. Weitere Informationen zu den Konsistenzeigenschaften von Cloud Spanner finden Sie unter TrueTime und externe Konsistenz.

  • Wenn die Latenz in manchen Situationen starke Lesevorgänge verhindert, verwenden Sie veraltete Lesevorgänge (begrenzte Veralterung oder exakte Veralterung), um die Leistung zu verbessern, sofern die Lesevorgänge nicht ganz aktuell sein müssen. Wie unter Cloud Spanner-Replikation beschrieben, sind 15 Sekunden ein angemessener Veralterungswert für eine gute Leistung.

Einzelne Leseaufrufe

Cloud Spanner unterstützt Methoden für einzelne Leseaufrufe (d. h. einen Leseaufruf außerhalb des Kontexts einer Transaktion) für eine Datenbank so:

  • Lesevorgang als SQL-Abrufanweisung oder über die Lese-API von Cloud Spanner ausführen
  • Starken Lesevorgang aus einer einzelnen Zeile oder mehreren Zeilen in einer Tabelle durchführen
  • Veralteten Lesevorgang aus einer einzelnen Zeile oder mehreren Zeilen in einer Tabelle durchführen
  • Aus einer einzelnen Zeile oder mehreren Zeilen in einem sekundären Index lesen

In den folgenden Abschnitten wird erläutert, wie Lesemethoden der Cloud-Clientbibliotheken für die Cloud Spanner API verwendet werden.

Abfrage ausführen

Im Folgenden wird gezeigt, wie eine SQL-Abrufanweisung für eine Datenbank ausgeführt wird.

C++

Führen Sie mit ExecuteQuery() eine SQL-Abfrageanweisung für eine Datenbank aus.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto const& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::runtime_error(row.status().message());
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

Verwenden Sie ExecuteReaderAsync(), um die Datenbank abzufragen.

string connectionString =
$"Data Source=projects/{projectId}/instances/"
+ $"{instanceId}/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            Console.WriteLine("SingerId : "
                + reader.GetFieldValue<string>("SingerId")
                + " AlbumId : "
                + reader.GetFieldValue<string>("AlbumId")
                + " AlbumTitle : "
                + reader.GetFieldValue<string>("AlbumTitle"));
        }
    }
}

Go

Verwenden Sie Client.Single().Query, um die Datenbank abzufragen.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func query(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	stmt := spanner.Statement{SQL: `SELECT SingerId, AlbumId, AlbumTitle FROM Albums`}
	iter := client.Single().Query(ctx, stmt)
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

Java

Verwenden Sie ReadContext.executeQuery, um die Datenbank abzufragen.

static void query(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse() // Execute a single read or query against Cloud Spanner.
          .executeQuery(Statement.of("SELECT SingerId, AlbumId, AlbumTitle FROM Albums"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

Node.js

Verwenden Sie Database.run, um die Datenbank abzufragen.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const query = {
  sql: 'SELECT SingerId, AlbumId, AlbumTitle FROM Albums',
};

// Queries rows from the Albums table
try {
  const [rows] = await database.run(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

PHP

Verwenden Sie Database::execute, um die Datenbank abzufragen.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function query_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $results = $database->execute(
        'SELECT SingerId, AlbumId, AlbumTitle FROM Albums'
    );

    foreach ($results as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

Verwenden Sie Database.execute_sql, um die Datenbank abzufragen.

def query_data(instance_id, database_id):
    """Queries sample data from the database using SQL."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums"
        )

        for row in results:
            print(u"SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

Verwenden Sie Client#execute, um die Datenbank abzufragen.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.execute("SELECT SingerId, AlbumId, AlbumTitle FROM Albums").rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

Achten Sie beim Erstellen einer SQL-Anweisung auf die SQL-Referenzinformationen zur SQL-Abfragesyntax sowie zu Funktionen und Operatoren.

Starken Lesevorgang durchführen

Im Folgenden wird gezeigt, wie ein starker Lesevorgang für null oder mehr Zeilen aus einer Datenbank durchgeführt wird.

C++

Der Code zum Lesen von Daten unterscheidet sich nicht von dem Code im vorherigen Beispiel zum Abfragen von Cloud Spanner durch die Ausführung einer SQL-Anweisung.

void QueryData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::SqlStatement select("SELECT SingerId, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string>;
  auto rows = client.ExecuteQuery(std::move(select));
  for (auto const& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::runtime_error(row.status().message());
    std::cout << "SingerId: " << std::get<0>(*row) << "\t";
    std::cout << "LastName: " << std::get<1>(*row) << "\n";
  }

  std::cout << "Query completed for [spanner_query_data]\n";
}

C#

Der Code zum Lesen von Daten unterscheidet sich nicht von dem Code im vorherigen Beispiel zum Abfragen von Cloud Spanner durch die Ausführung einer SQL-Anweisung.

string connectionString =
$"Data Source=projects/{projectId}/instances/"
+ $"{instanceId}/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            Console.WriteLine("SingerId : "
                + reader.GetFieldValue<string>("SingerId")
                + " AlbumId : "
                + reader.GetFieldValue<string>("AlbumId")
                + " AlbumTitle : "
                + reader.GetFieldValue<string>("AlbumTitle"));
        }
    }
}

Go

Verwenden Sie Client.Single().Read, um Zeilen aus der Datenbank zu lesen.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func read(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().Read(ctx, "Albums", spanner.AllKeys(),
		[]string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID, albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

In diesem Beispiel wird mit AllKeys eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Java

Verwenden Sie ReadContext.read, um Zeilen aus der Datenbank zu lesen.

static void read(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .read(
              "Albums",
              KeySet.all(), // Read all rows in a table.
              Arrays.asList("SingerId", "AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n", resultSet.getLong(0), resultSet.getLong(1), resultSet.getString(2));
    }
  }
}

In diesem Beispiel wird mit KeySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Node.js

Verwenden Sie Table.read, um Zeilen aus der Datenbank zu lesen.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
};

try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(
      `SingerId: ${json.SingerId}, AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

In diesem Beispiel wird mit keySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

PHP

Verwenden Sie Database::read, um Zeilen aus der Datenbank zu lesen.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.
 * Example:
 * ```
 * read_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

In diesem Beispiel wird mit keySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Python

Verwenden Sie Database.read, um Zeilen aus der Datenbank zu lesen.

def read_data(instance_id, database_id):
    """Reads sample data from the database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums", columns=("SingerId", "AlbumId", "AlbumTitle"), keyset=keyset
        )

        for row in results:
            print(u"SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))

In diesem Beispiel wird mit KeySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Ruby

Verwenden Sie Client#read, um Zeilen aus der Datenbank zu lesen.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

client.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
  puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
end

Veralteten Lesevorgang durchführen

Der folgende Beispielcode zeigt, wie ein veralteter Lesevorgang von null oder mehr Zeilen aus einer Datenbank unter Verwendung einer Zeitstempelgrenze des Typs exakte Veralterung durchgeführt wird. Anweisungen zur Durchführung eines veralteten Lesevorgangs mit einer Zeitstempelgrenze vom Typ begrenzte Veralterung finden Sie im Hinweis nach dem Beispielcode. Weitere Informationen über die verschiedenen verfügbaren Arten von Zeitstempelgrenzen finden Sie unter Zeitstempelgrenzen.

C++

Verwenden Sie ExecuteQuery() mit MakeReadOnlyTransaction() und Transaction::ReadOnlyOptions(), um einen veralteten Lesevorgang durchzuführen.

void ReadStaleData(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto opts = spanner::Transaction::ReadOnlyOptions(std::chrono::seconds(15));
  auto read_only = spanner::MakeReadOnlyTransaction(opts);

  spanner::SqlStatement select(
      "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
  using RowType = std::tuple<std::int64_t, std::int64_t, std::string>;

  auto rows = client.ExecuteQuery(read_only, select);
  for (auto const& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::runtime_error(row.status().message());
    std::cout << "SingerId: " << std::get<0>(*row)
              << " AlbumId: " << std::get<1>(*row)
              << " AlbumTitle: " << std::get<2>(*row) << "\n";
  }
}

C#

Verwenden Sie die Methode BeginReadOnlyTransactionAsync für eine connection mit einem angegebenen Wert für TimestampBound.OfExactStaleness(), um die Datenbank abzufragen.

string connectionString =
    $"Data Source=projects/{projectId}/instances/{instanceId}"
    + $"/databases/{databaseId}";

// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    await connection.OpenAsync();

    // Open a new read only transaction.
    var staleness = TimestampBound.OfExactStaleness(
        TimeSpan.FromSeconds(15));
    using (var transaction =
        await connection.BeginReadOnlyTransactionAsync(staleness))
    {
        var cmd = connection.CreateSelectCommand(
            "SELECT SingerId, AlbumId, AlbumTitle FROM Albums");
        cmd.Transaction = transaction;

        using (var reader = await cmd.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                Console.WriteLine("SingerId : "
                    + reader.GetFieldValue<string>("SingerId")
                    + " AlbumId : "
                    + reader.GetFieldValue<string>("AlbumId")
                    + " AlbumTitle : "
                    + reader.GetFieldValue<string>("AlbumTitle"));
            }
        }
    }
}

Go

Verwenden Sie Client.ReadOnlyTransaction().WithTimestampBound() und geben Sie einen Wert für ExactStaleness an, um mithilfe einer Zeitstempelgrenze mit exakter Veralterung einen Lesevorgang für Zeilen aus der Datenbank durchzuführen.


import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readStaleData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	ro := client.ReadOnlyTransaction().WithTimestampBound(spanner.ExactStaleness(15 * time.Second))
	defer ro.Close()

	iter := ro.Read(ctx, "Albums", spanner.AllKeys(), []string{"SingerId", "AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var singerID int64
		var albumID int64
		var albumTitle string
		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %d %s\n", singerID, albumID, albumTitle)
	}
}

In diesem Beispiel wird mit AllKeys eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Java

Verwenden Sie die Methode read eines Elements ReadContext, das einen festgelegten Parameter TimestampBound.ofExactStaleness() enthält, um unter Verwendung einer Zeitstempelgrenze vom Typ "exakte Veralterung" einen Lesevorgang für Zeilen aus der Datenbank durchzuführen.

static void readStaleData(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse(TimestampBound.ofExactStaleness(15, TimeUnit.SECONDS))
          .read(
              "Albums", KeySet.all(), Arrays.asList("SingerId", "AlbumId", "MarketingBudget"))) {
    while (resultSet.next()) {
      System.out.printf(
          "%d %d %s\n",
          resultSet.getLong(0),
          resultSet.getLong(1),
          resultSet.isNull(2) ? "NULL" : resultSet.getLong("MarketingBudget"));
    }
  }
}

In diesem Beispiel wird mit KeySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Node.js

Verwenden Sie Table.read mit der Option exactStaleness, um Zeilen mit einer Zeitstempelgrenze vom Typ "exakte Veralterung" aus der Datenbank zu lesen.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Reads rows from the Albums table
const albumsTable = database.table('Albums');

const query = {
  columns: ['SingerId', 'AlbumId', 'AlbumTitle', 'MarketingBudget'],
  keySet: {
    all: true,
  },
};

const options = {
  // Guarantees that all writes committed more than 15 seconds ago are visible
  exactStaleness: 15,
};

try {
  const [rows] = await albumsTable.read(query, options);

  rows.forEach(row => {
    const json = row.toJSON();
    const id = json.SingerId;
    const album = json.AlbumId;
    const title = json.AlbumTitle;
    const budget = json.MarketingBudget ? json.MarketingBudget : '';
    console.log(
      `SingerId: ${id}, AlbumId: ${album}, AlbumTitle: ${title}, MarketingBudget: ${budget}`
    );
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  await database.close();
}

In diesem Beispiel wird mit keySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

PHP

Verwenden Sie Database::read mit einem angegebenen Wert für exactStaleness, um Zeilen mit einer Zeitstempelgrenze vom Typ "exakte Veralterung" aus der Datenbank zu lesen.

use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database.  The data is exactly 15 seconds stale.
 * Guarantees that all writes committed more than 15 seconds ago are visible.
 * Example:
 * ```
 * read_stale_data
 *($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_stale_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);
    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['SingerId', 'AlbumId', 'AlbumTitle'],
        ['exactStaleness' => new Duration(15)]
    );

    foreach ($results->rows() as $row) {
        printf('SingerId: %s, AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['SingerId'], $row['AlbumId'], $row['AlbumTitle']);
    }
}

In diesem Beispiel wird mit keySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Python

Verwenden Sie die Methode read eines Elements Database snapshot mit einem angegebenen Wert für exact_staleness, um Zeilen mit einer Zeitstempelgrenze vom Typ "exakte Veralterung" aus der Datenbank zu lesen.

def read_stale_data(instance_id, database_id):
    """Reads sample data from the database. The data is exactly 15 seconds
    stale."""
    import datetime

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)
    staleness = datetime.timedelta(seconds=15)

    with database.snapshot(exact_staleness=staleness) as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("SingerId", "AlbumId", "MarketingBudget"),
            keyset=keyset,
        )

        for row in results:
            print(u"SingerId: {}, AlbumId: {}, MarketingBudget: {}".format(*row))

In diesem Beispiel wird mit KeySet eine Sammlung von Schlüsseln oder Schlüsselbereichen definiert, die gelesen werden sollen.

Ruby

Verwenden Sie die Methode read eines Snapshots Client mit einem in Sekunden angegebenen Wert für staleness, um Zeilen mit einer Zeitstempelgrenze vom Typ "exakte Veralterung" aus der Datenbank zu lesen.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"
require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

# Perform a read with a data staleness of 15 seconds
client.snapshot staleness: 15 do |snapshot|
  snapshot.read("Albums", [:SingerId, :AlbumId, :AlbumTitle]).rows.each do |row|
    puts "#{row[:SingerId]} #{row[:AlbumId]} #{row[:AlbumTitle]}"
  end
end

Lesevorgang mit einem Index ausführen

Im Folgenden wird gezeigt, wie Sie mit einem Index null oder mehr Zeilen aus einer Datenbank auslesen.

C++

Verwenden Sie die Funktion Read(), um einen Lesevorgang mit einem Index durchzuführen.

void ReadDataWithIndex(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;

  spanner::ReadOptions read_options;
  read_options.index_name = "AlbumsByAlbumTitle";
  auto rows = client.Read("Albums", google::cloud::spanner::KeySet::All(),
                          {"AlbumId", "AlbumTitle"}, read_options);
  using RowType = std::tuple<std::int64_t, std::string>;
  for (auto const& row : spanner::StreamOf<RowType>(rows)) {
    if (!row) throw std::runtime_error(row.status().message());
    std::cout << "AlbumId: " << std::get<0>(*row) << "\t";
    std::cout << "AlbumTitle: " << std::get<1>(*row) << "\n";
  }
  std::cout << "Read completed for [spanner_read_data_with_index]\n";
}

C#

Lesen Sie Daten mit dem Index aus, indem Sie eine Abfrage ausführen, in der der Index explizit angegeben wird:

string connectionString =
$"Data Source=projects/{projectId}/instances/{instanceId}"
+ $"/databases/{databaseId}";
// Create connection to Cloud Spanner.
using (var connection = new SpannerConnection(connectionString))
{
    var cmd = connection.CreateSelectCommand(
        "SELECT AlbumId, AlbumTitle, MarketingBudget FROM Albums@ "
        + "{FORCE_INDEX=AlbumsByAlbumTitle} "
        + $"WHERE AlbumTitle >= @startTitle "
        + $"AND AlbumTitle < @endTitle",
        new SpannerParameterCollection {
            {"startTitle", SpannerDbType.String},
            {"endTitle", SpannerDbType.String} });
    cmd.Parameters["startTitle"].Value = startTitle;
    cmd.Parameters["endTitle"].Value = endTitle;
    using (var reader = await cmd.ExecuteReaderAsync())
    {
        while (await reader.ReadAsync())
        {
            var marketingBudget = reader.IsDBNull(
                reader.GetOrdinal("MarketingBudget")) ?
                "" :
                reader.GetFieldValue<string>("MarketingBudget");
            Console.WriteLine("AlbumId : "
            + reader.GetFieldValue<string>("AlbumId")
            + " AlbumTitle : "
            + reader.GetFieldValue<string>("AlbumTitle")
            + " MarketingBudget : "
            + marketingBudget);
        }
    }
}

Go

Verwenden Sie Client.Single().ReadUsingIndex, um Zeilen mit einem Index aus der Datenbank zu lesen.


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readUsingIndex(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	iter := client.Single().ReadUsingIndex(ctx, "Albums", "AlbumsByAlbumTitle", spanner.AllKeys(),
		[]string{"AlbumId", "AlbumTitle"})
	defer iter.Stop()
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return err
		}
		var albumID int64
		var albumTitle string
		if err := row.Columns(&albumID, &albumTitle); err != nil {
			return err
		}
		fmt.Fprintf(w, "%d %s\n", albumID, albumTitle)
	}
}

Java

Verwenden Sie ReadContext.readUsingIndex, um Zeilen mit einem Index aus der Datenbank zu lesen.

static void readUsingIndex(DatabaseClient dbClient) {
  try (ResultSet resultSet =
      dbClient
          .singleUse()
          .readUsingIndex(
              "Albums",
              "AlbumsByAlbumTitle",
              KeySet.all(),
              Arrays.asList("AlbumId", "AlbumTitle"))) {
    while (resultSet.next()) {
      System.out.printf("%d %s\n", resultSet.getLong(0), resultSet.getString(1));
    }
  }
}

Node.js

Verwenden Sie Table.read und geben Sie den Index in der Abfrage an, um Zeilen mit einem Index aus der Datenbank zu lesen.

// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

const albumsTable = database.table('Albums');

const query = {
  columns: ['AlbumId', 'AlbumTitle'],
  keySet: {
    all: true,
  },
  index: 'AlbumsByAlbumTitle',
};

// Reads the Albums table using an index
try {
  const [rows] = await albumsTable.read(query);

  rows.forEach(row => {
    const json = row.toJSON();
    console.log(`AlbumId: ${json.AlbumId}, AlbumTitle: ${json.AlbumTitle}`);
  });
} catch (err) {
  console.error('ERROR:', err);
} finally {
  // Close the database when finished.
  database.close();
}

PHP

Verwenden Sie Database::read und geben Sie den Index an, um Zeilen mit einem Index aus der Datenbank zu lesen.

use Google\Cloud\Spanner\SpannerClient;

/**
 * Reads sample data from the database using an index.
 *
 * The index must exist before running this sample. You can add the index
 * by running the `add_index` sample or by running this DDL statement against
 * your database:
 *
 *     CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)
 *
 * Example:
 * ```
 * read_data_with_index($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function read_data_with_index($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $instance = $spanner->instance($instanceId);
    $database = $instance->database($databaseId);

    $keySet = $spanner->keySet(['all' => true]);
    $results = $database->read(
        'Albums',
        $keySet,
        ['AlbumId', 'AlbumTitle'],
        ['index' => 'AlbumsByAlbumTitle']
    );

    foreach ($results->rows() as $row) {
        printf('AlbumId: %s, AlbumTitle: %s' . PHP_EOL,
            $row['AlbumId'], $row['AlbumTitle']);
    }
}

Python

Verwenden Sie Database.read und geben Sie den Index an, um Zeilen mit einem Index aus der Datenbank zu lesen.

def read_data_with_index(instance_id, database_id):
    """Reads sample data from the database using an index.

    The index must exist before running this sample. You can add the index
    by running the `add_index` sample or by running this DDL statement against
    your database:

        CREATE INDEX AlbumsByAlbumTitle ON Albums(AlbumTitle)

    """
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    with database.snapshot() as snapshot:
        keyset = spanner.KeySet(all_=True)
        results = snapshot.read(
            table="Albums",
            columns=("AlbumId", "AlbumTitle"),
            keyset=keyset,
            index="AlbumsByAlbumTitle",
        )

        for row in results:
            print("AlbumId: {}, AlbumTitle: {}".format(*row))

Ruby

Verwenden Sie Client#read und geben Sie den Index an, um Zeilen mit einem Index aus der Datenbank zu lesen.

# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

spanner = Google::Cloud::Spanner.new project: project_id
client  = spanner.client instance_id, database_id

result = client.read "Albums", [:AlbumId, :AlbumTitle],
                     index: "AlbumsByAlbumTitle"

result.rows.each do |row|
  puts "#{row[:AlbumId]} #{row[:AlbumTitle]}"
end

Daten gleichzeitig lesen

Wenn große Datenmengen aus Cloud Spanner gelesen oder abgefragt werden, kann es hilfreich sein, die Abfrage in kleinere Teile oder Partitionen aufzuteilen und mehrere Rechner zu verwenden, um die Partitionen abzurufen.

Mithilfe der Cloud Spanner-Clientbibliotheken können Sie API-Vorgänge gleichzeitig ausführen. SQL-Abfragen lassen sich jedoch nur partitionieren, wenn der erste Operator im Abfrageausführungsplan Distributed Union ist. Folgen Sie der Anleitung unter Best Practices für SQL, um sich den Abfrageausführungsplan für eine bestimmte SQL-Abfrage anzeigen zu lassen.

Sobald Sie den Abfrageausführungsplan haben, überprüfen Sie, ob der erste darin enthaltene Operator Distributed Union ist.

C++

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen einer Cloud Spanner-Batchtransaktion
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
void UsePartitionQuery(google::cloud::spanner::Client client) {
  namespace spanner = ::google::cloud::spanner;
  auto txn = spanner::MakeReadOnlyTransaction();

  spanner::SqlStatement select(
      "SELECT SingerId, FirstName, LastName FROM Singers");
  using RowType = std::tuple<std::int64_t, std::string, std::string>;

  auto partitions = client.PartitionQuery(std::move(txn), select, {});
  if (!partitions) throw std::runtime_error(partitions.status().message());
  int number_of_rows = 0;
  for (auto const& partition : *partitions) {
    auto rows = client.ExecuteQuery(partition);
    for (auto const& row : spanner::StreamOf<RowType>(rows)) {
      if (!row) throw std::runtime_error(row.status().message());
      number_of_rows++;
    }
  }
  std::cout << "Number of partitions: " << partitions->size() << "\n"
            << "Number of rows: " << number_of_rows << "\n";
  std::cout << "Read completed for [spanner_batch_client]\n";
}

C#

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen einer Cloud Spanner-Batchtransaktion
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
private static int s_partitionId;
private static int s_rowsRead;
public static object BatchReadRecords(string projectId,
     string instanceId, string databaseId)
{
    var responseTask =
        DistributedReadAsync(projectId, instanceId, databaseId);
    Console.WriteLine("Waiting for operation to complete...");
    responseTask.Wait();
    Console.WriteLine($"Operation status: {responseTask.Status}");
    return ExitCode.Success;
}

private static async Task DistributedReadAsync(string projectId,
    string instanceId, string databaseId)
{
    string connectionString =
        $"Data Source=projects/{projectId}/instances/{instanceId}"
        + $"/databases/{databaseId}";
    using (var connection = new SpannerConnection(connectionString))
    {
        await connection.OpenAsync();

        using (var transaction =
            await connection.BeginReadOnlyTransactionAsync())
        using (var cmd =
            connection.CreateSelectCommand(
                "SELECT SingerId, FirstName, LastName FROM Singers"))
        {
            transaction.DisposeBehavior =
                DisposeBehavior.CloseResources;
            cmd.Transaction = transaction;
            var partitions = await cmd.GetReaderPartitionsAsync();
            var transactionId = transaction.TransactionId;
            await Task.WhenAll(partitions.Select(
                    x => DistributedReadWorkerAsync(x, transactionId)))
                        .ConfigureAwait(false);
        }
        Console.WriteLine($"Done reading!  Total rows read: "
            + $"{s_rowsRead:N0} with {s_partitionId} partition(s)");
    }
}

private static async Task DistributedReadWorkerAsync(
    CommandPartition readPartition, TransactionId id)
{
    var localId = Interlocked.Increment(ref s_partitionId);
    using (var connection = new SpannerConnection(id.ConnectionString))
    using (var transaction = connection.BeginReadOnlyTransaction(id))
    {
        using (var cmd = connection.CreateCommandWithPartition(
            readPartition, transaction))
        {
            using (var reader =
                await cmd.ExecuteReaderAsync().ConfigureAwait(false))
            {
                while (await reader.ReadAsync())
                {
                    Interlocked.Increment(ref s_rowsRead);
                    Console.WriteLine($"Partition ({localId}) "
                        + $"{reader.GetFieldValue<string>("SingerId")}"
                        + $" {reader.GetFieldValue<string>("FirstName")}"
                        + $" {reader.GetFieldValue<string>("LastName")}");
                }
            }
        }
        Console.WriteLine($"Done with single reader {localId}.");
    }
}

Go

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Clients und einer Transaktion.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
	"google.golang.org/api/iterator"
)

func readBatchData(w io.Writer, db string) error {
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return err
	}
	defer client.Close()

	txn, err := client.BatchReadOnlyTransaction(ctx, spanner.StrongRead())
	if err != nil {
		return err
	}
	defer txn.Close()

	// Singer represents a row in the Singers table.
	type Singer struct {
		SingerID   int64
		FirstName  string
		LastName   string
		SingerInfo []byte
	}
	stmt := spanner.Statement{SQL: "SELECT SingerId, FirstName, LastName FROM Singers;"}
	partitions, err := txn.PartitionQuery(ctx, stmt, spanner.PartitionOptions{})
	if err != nil {
		return err
	}
	recordCount := 0
	for i, p := range partitions {
		iter := txn.Execute(ctx, p)
		defer iter.Stop()
		for {
			row, err := iter.Next()
			if err == iterator.Done {
				break
			} else if err != nil {
				return err
			}
			var s Singer
			if err := row.ToStruct(&s); err != nil {
				return err
			}
			fmt.Fprintf(w, "Partition (%d) %v\n", i, s)
			recordCount++
		}
	}
	fmt.Fprintf(w, "Total partition count: %v\n", len(partitions))
	fmt.Fprintf(w, "Total record count: %v\n", recordCount)
	return nil
}

Java

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Batch-Clients und einer Transaktion.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// Statistics
int totalPartitions;
AtomicInteger totalRecords = new AtomicInteger(0);

try {
  BatchClient batchClient =
      spanner.getBatchClient(DatabaseId.of(options.getProjectId(), instanceId, databaseId));

  final BatchReadOnlyTransaction txn =
      batchClient.batchReadOnlyTransaction(TimestampBound.strong());

  // A Partition object is serializable and can be used from a different process.
  List<Partition> partitions =
      txn.partitionQuery(
          PartitionOptions.getDefaultInstance(),
          Statement.of("SELECT SingerId, FirstName, LastName FROM Singers"));

  totalPartitions = partitions.size();

  for (final Partition p : partitions) {
    executor.execute(
        () -> {
          try (ResultSet results = txn.execute(p)) {
            while (results.next()) {
              long singerId = results.getLong(0);
              String firstName = results.getString(1);
              String lastName = results.getString(2);
              System.out.println("[" + singerId + "] " + firstName + " " + lastName);
              totalRecords.getAndIncrement();
            }
          }
        });
  }
} finally {
  executor.shutdown();
  executor.awaitTermination(1, TimeUnit.HOURS);
  spanner.close();
}

double avgRecordsPerPartition = 0.0;
if (totalPartitions != 0) {
  avgRecordsPerPartition = (double) totalRecords.get() / totalPartitions;
}
System.out.println("totalPartitions=" + totalPartitions);
System.out.println("totalRecords=" + totalRecords);
System.out.println("avgRecordsPerPartition=" + avgRecordsPerPartition);

Node.js

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Clients und eines Batches.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
// Imports the Google Cloud client library
const {Spanner} = require('@google-cloud/spanner');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// const projectId = 'my-project-id';
// const instanceId = 'my-instance';
// const databaseId = 'my-database';

// Creates a client
const spanner = new Spanner({
  projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);
const [transaction] = await database.createBatchTransaction();

const query = 'SELECT * FROM Singers';

const [partitions] = await transaction.createQueryPartitions(query);
console.log(`Successfully created ${partitions.length} query partitions.`);

let row_count = 0;
const promises = [];
partitions.forEach(partition => {
  promises.push(
    transaction.execute(partition).then(results => {
      const rows = results[0].map(row => row.toJSON());
      row_count += rows.length;
    })
  );
});
Promise.all(promises)
  .then(() => {
    console.log(
      `Successfully received ${row_count} from executed partitions.`
    );
    transaction.close();
  })
  .then(() => {
    database.close();
  });

PHP

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Clients und eines Batches.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
use Google\Cloud\Spanner\SpannerClient;

/**
 * Queries sample data from the database using SQL.
 * Example:
 * ```
 * batch_query_data($instanceId, $databaseId);
 * ```
 *
 * @param string $instanceId The Spanner instance ID.
 * @param string $databaseId The Spanner database ID.
 */
function batch_query_data($instanceId, $databaseId)
{
    $spanner = new SpannerClient();
    $batch = $spanner->batch($instanceId, $databaseId);
    $snapshot = $batch->snapshot();
    $queryString = 'SELECT SingerId, FirstName, LastName FROM Singers';
    $partitions = $snapshot->partitionQuery($queryString);
    $totalPartitions = count($partitions);
    $totalRecords = 0;
    foreach ($partitions as $partition) {
        $result = $snapshot->executePartition($partition);
        $rows = $result->rows();
        foreach ($rows as $row) {
            $singerId = $row['SingerId'];
            $firstName = $row['FirstName'];
            $lastName = $row['LastName'];
            printf('SingerId: %s, FirstName: %s, LastName: %s' . PHP_EOL, $singerId, $firstName, $lastName);
            $totalRecords++;
        }
    }
    printf('Total Partitions: %d' . PHP_EOL, $totalPartitions);
    printf('Total Records: %d' . PHP_EOL, $totalRecords);
    $averageRecordsPerPartition = $totalRecords / $totalPartitions;
    printf('Average Records Per Partition: %f' . PHP_EOL, $averageRecordsPerPartition);
}

Python

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Clients und einer Batch-Transaktion.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.

def run_batch_query(instance_id, database_id):
    """Runs an example batch query."""

    # Expected Table Format:
    # CREATE TABLE Singers (
    #   SingerId   INT64 NOT NULL,
    #   FirstName  STRING(1024),
    #   LastName   STRING(1024),
    #   SingerInfo BYTES(MAX),
    # ) PRIMARY KEY (SingerId);

    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    # Create the batch transaction and generate partitions
    snapshot = database.batch_snapshot()
    partitions = snapshot.generate_read_batches(
        table="Singers",
        columns=("SingerId", "FirstName", "LastName"),
        keyset=spanner.KeySet(all_=True),
    )

    # Create a pool of workers for the tasks
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(process, snapshot, p) for p in partitions]

        for future in concurrent.futures.as_completed(futures, timeout=3600):
            finish, row_ct = future.result()
            elapsed = finish - start
            print(u"Completed {} rows in {} seconds".format(row_ct, elapsed))

    # Clean up
    snapshot.close()

def process(snapshot, partition):
    """Processes the requests of a query in an separate process."""
    print("Started processing partition.")
    row_ct = 0
    for row in snapshot.process_read_batch(partition):
        print(u"SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
        row_ct += 1
    return time.time(), row_ct

Ruby

In diesem Beispiel werden Partitionen einer SQL-Abfrage der Tabelle Singers abgerufen und die Abfrage wird anhand folgender Schritte über jede Partition ausgeführt:

  • Erstellen eines Cloud Spanner-Batch-Clients.
  • Erstellen von Partitionen für die Abfrage, damit die Partitionen auf mehrere Worker verteilt werden können.
  • Abrufen der Abfrageergebnisse für jede Partition.
# project_id  = "Your Google Cloud project ID"
# instance_id = "Your Spanner instance ID"
# database_id = "Your Spanner database ID"

require "google/cloud/spanner"

# Prepare a thread pool with number of processors
processor_count  = Concurrent.processor_count
thread_pool      = Concurrent::FixedThreadPool.new processor_count

# Prepare AtomicFixnum to count total records using multiple threads
total_records = Concurrent::AtomicFixnum.new

# Create a new Spanner batch client
spanner        = Google::Cloud::Spanner.new project: project_id
batch_client   = spanner.batch_client instance_id, database_id

# Get a strong timestamp bound batch_snapshot
batch_snapshot = batch_client.batch_snapshot strong: true

# Get partitions for specified query
partitions       = batch_snapshot.partition_query "SELECT SingerId, FirstName, LastName FROM Singers"
total_partitions = partitions.size

# Enqueue a new thread pool job
partitions.each_with_index do |partition, _partition_index|
  thread_pool.post do
    # Increment total_records per new row
    batch_snapshot.execute_partition(partition).rows.each do |_row|
      total_records.increment
    end
  end
end

# Wait for queued jobs to complete
thread_pool.shutdown
thread_pool.wait_for_termination

# Close the client connection and release resources.
batch_snapshot.close

# Collect statistics for batch query
average_records_per_partition = 0.0
if total_partitions != 0
  average_records_per_partition = total_records.value / total_partitions.to_f
end

puts "Total Partitions: #{total_partitions}"
puts "Total Records: #{total_records.value}"
puts "Average records per Partition: #{average_records_per_partition}"