Inizia a utilizzare Spanner e PGAdapter


Questo tutorial illustra i seguenti passaggi utilizzando il proxy locale Spanner PGAdapter per i driver PostgreSQL:

  • Crea un'istanza e un database Spanner.
  • Scrivi, leggi ed esegui query SQL sui dati nel database.
  • Aggiorna lo schema del database.
  • Aggiorna i dati utilizzando una transazione di lettura/scrittura.
  • Aggiungi un indice secondario al database.
  • Utilizza l'indice per leggere ed eseguire query SQL sui dati.
  • Recupera i dati utilizzando una transazione di sola lettura.


Questo tutorial utilizza Spanner, un componente fatturabile diGoogle Cloud. Per informazioni sul costo dell'utilizzo di Spanner, consulta Prezzi.

Prima di iniziare

Completa i passaggi descritti in Configurazione, che riguardano la creazione e la configurazione di un progetto Google Cloud predefinito, l'attivazione della fatturazione, l'attivazione dell'API Cloud Spanner e la configurazione di OAuth 2.0 per ottenere le credenziali di autenticazione per utilizzare l'API Cloud Spanner.

In particolare, assicurati di eseguire gcloud auth application-default login per configurare l'ambiente di sviluppo locale con le credenziali di autenticazione.

Prepara l'ambiente PGAdapter locale

Puoi utilizzare i driver PostgreSQL in combinazione con PGAdapter per connetterti a Spanner. PGAdapter è un proxy locale che traduce il protocollo di rete PostgreSQL nel protocollo gRPC di Spanner.

PGAdapter richiede Java o Docker per funzionare.

  1. Se non sono già installati, installa uno dei seguenti componenti sulla tua macchina di sviluppo:

  2. Clona il repository dell'app di esempio sulla tua macchina locale:

    git clone
  3. Passa alla directory che contiene il codice di esempio di Spanner:


    cd pgadapter/samples/snippets/psql-snippets


    cd pgadapter/samples/snippets/java-snippets
    mvn package -DskipTests


    cd pgadapter/samples/snippets/golang-snippets


    cd pgadapter/samples/snippets/nodejs-snippets
    npm install


    cd pgadapter/samples/snippets/python-snippets
    python -m venv ./venv
    pip install -r requirements.txt
    cd samples


    cd pgadapter/samples/snippets/dotnet-snippets


    cd pgadapter/samples/snippets/php-snippets
    composer install
    cd samples

Crea un'istanza

Quando utilizzi Spanner per la prima volta, devi creare un'istanza, ovvero un'allocazione delle risorse utilizzate dai database Spanner. Quando crei un'istanza, scegli una configurazione dell'istanza, che determina dove vengono archiviati i dati e anche il numero di nodi da utilizzare, che determina la quantità di risorse di gestione e archiviazione nell'istanza.

Esegui il seguente comando per creare un'istanza Spanner nella regione us-central1 con 1 nodo:

gcloud spanner instances create test-instance --config=regional-us-central1 \
    --description="Test Instance" --nodes=1

Tieni presente che viene creata un'istanza con le seguenti caratteristiche:

  • ID istanza test-instance
  • Nome visualizzato Test Instance
  • Configurazione dell'istanza regional-us-central1 (le configurazioni regionali memorizzano i dati in una regione, mentre quelle multiregionali li distribuiscono in più regioni. Per ulteriori informazioni, consulta Informazioni sulle istanze.
  • Un conteggio dei nodi pari a 1 (node_count) corrisponde alla quantità di risorse di gestione e archiviazione disponibili per i database nell'istanza. Scopri di più in Nodi e unità di elaborazione.

Dovresti vedere:

Creating instance...done.

Sfogliare i file di esempio

Il repository di esempi contiene un esempio che mostra come utilizzare Spanner con PGAdapter.

Dai un'occhiata alla cartella samples/snippets, che mostra come utilizzare Spanner. Il codice mostra come creare e utilizzare un nuovo database. I dati utilizzano lo schema di esempio mostrato nella pagina Schema e modello dati.

Avvia PGAdapter

Avvia PGAdapter sulla tua macchina di sviluppo locale e indirizzalo all'istanza che hai creato.

I comandi seguenti presuppongono che tu abbia eseguito gcloud auth application-default login.

Applicazione Java

wget \
    && tar -xzvf pgadapter.tar.gz
java -jar pgadapter.jar -i test-instance


docker pull
docker run \
    --name pgadapter \
    --rm -d -p 5432:5432 \
    -v "$HOME/.config/gcloud":/gcloud:ro \
    --env CLOUDSDK_CONFIG=/gcloud \ \
    -i test-instance -x


docker pull
docker run \
    --name pgadapter-emulator \
    --rm -d \
    -p 5432:5432 \
    -p 9010:9010 \
    -p 9020:9020 \

Viene avviato PGAdapter con un emulatore Spanner integrato. Questo emulatore incorporato crea automaticamente qualsiasi istanza o database Spanner a cui ti colleghi senza che tu debba crearli manualmente in precedenza.

Ti consigliamo di eseguire PGAdapter in produzione come contenitore sidecar o come dipendenza in-process. Per ulteriori informazioni sul deployment di PGAdapter in produzione, consulta Scegliere un metodo per l'esecuzione di PGAdapter.

Crea un database

gcloud spanner databases create example-db --instance=test-instance \

Dovresti vedere:

Creating database...done.

Creare tabelle

Il codice seguente crea due tabelle nel database.



# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create two tables in one batch.
psql << SQL
-- Create the singers table
CREATE TABLE singers (
  singer_id   bigint not null primary key,
  first_name  character varying(1024),
  last_name   character varying(1024),
  singer_info bytea,
  full_name   character varying(2048) GENERATED ALWAYS
          AS (first_name || ' ' || last_name) STORED

-- Create the albums table. This table is interleaved in the parent table
-- "singers".
  singer_id     bigint not null,
  album_id      bigint not null,
  album_title   character varying(1024),
  primary key (singer_id, album_id)
-- The 'interleave in parent' clause is a Spanner-specific extension to
-- open-source PostgreSQL.

echo "Created Singers & Albums tables in database: [${PGDATABASE}]"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class CreateTables {
  static void createTables(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two tables in one batch.
            "create table singers ("
                + "  singer_id   bigint primary key not null,"
                + "  first_name  varchar(1024),"
                + "  last_name   varchar(1024),"
                + "  singer_info bytea,"
                + "  full_name   varchar(2048) generated always as (\n"
                + "      case when first_name is null then last_name\n"
                + "          when last_name  is null then first_name\n"
                + "          else first_name || ' ' || last_name\n"
                + "      end) stored"
                + ")");
            "create table albums ("
                + "  singer_id     bigint not null,"
                + "  album_id      bigint not null,"
                + "  album_title   varchar,"
                + "  primary key (singer_id, album_id)"
                + ") interleave in parent singers on delete cascade");
        System.out.println("Created Singers & Albums tables in database: [" + database + "]");


import (


func CreateTables(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Create two tables in one batch on Spanner.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "create table singers (" +
			"  singer_id   bigint primary key not null," +
			"  first_name  character varying(1024)," +
			"  last_name   character varying(1024)," +
			"  singer_info bytea," +
			"  full_name   character varying(2048) generated " +
			"  always as (first_name || ' ' || last_name) stored" +
		{SQL: "create table albums (" +
			"  singer_id     bigint not null," +
			"  album_id      bigint not null," +
			"  album_title   character varying(1024)," +
			"  primary key (singer_id, album_id)" +
			") interleave in parent singers on delete cascade"},
	cmd, err := br.Exec()
	if err != nil {
		return err
	if cmd.String() != "CREATE" {
		return fmt.Errorf("unexpected command tag: %v", cmd.String())
	if err := br.Close(); err != nil {
		return err
	fmt.Printf("Created Singers & Albums tables in database: [%s]\n", database)

	return nil


import { Client } from 'pg';

async function createTables(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Create two tables in one batch.
  await connection.query("start batch ddl");
  await connection.query("create table singers (" +
      "  singer_id   bigint primary key not null," +
      "  first_name  character varying(1024)," +
      "  last_name   character varying(1024)," +
      "  singer_info bytea," +
      "  full_name   character varying(2048) generated " +
      "  always as (first_name || ' ' || last_name) stored" +
  await connection.query("create table albums (" +
      "  singer_id     bigint not null," +
      "  album_id      bigint not null," +
      "  album_title   character varying(1024)," +
      "  primary key (singer_id, album_id)" +
      ") interleave in parent singers on delete cascade");
  await connection.query("run batch");
  console.log(`Created Singers & Albums tables in database: [${database}]`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def create_tables(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    with psycopg.connect("host={host} port={port} "
                         "dbname={database} "
                         "sslmode=disable".format(host=host, port=port,
                                                  database=database)) as conn:
        # Enable autocommit to execute DDL statements, as psycopg otherwise
        # tries to use a read/write transaction.
        conn.autocommit = True

        # Use a pipeline to execute multiple DDL statements in one batch.
        with conn.pipeline():
            conn.execute("create table singers ("
                         + "  singer_id   bigint primary key not null,"
                         + "  first_name  character varying(1024),"
                         + "  last_name   character varying(1024),"
                         + "  singer_info bytea,"
                         + "  full_name   character varying(2048) generated "
                         + "  always as (first_name || ' ' || last_name) stored"
                         + ")")
            conn.execute("create table albums ("
                         + "  singer_id     bigint not null,"
                         + "  album_id      bigint not null,"
                         + "  album_title   character varying(1024),"
                         + "  primary key (singer_id, album_id)"
                         + ") interleave in parent singers on delete cascade")
        print("Created Singers & Albums tables in database: [{database}]"


using Npgsql;

namespace dotnet_snippets;

public static class CreateTablesSample
    public static void CreateTables(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Create two tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table singers ("
            + "  singer_id   bigint primary key not null,"
            + "  first_name  varchar(1024),"
            + "  last_name   varchar(1024),"
            + "  singer_info bytea,"
            + "  full_name   varchar(2048) generated always as (\n"
            + "      case when first_name is null then last_name\n"
            + "          when last_name  is null then first_name\n"
            + "          else first_name || ' ' || last_name\n"
            + "      end) stored"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table albums ("
            + "  singer_id     bigint not null,"
            + "  album_id      bigint not null,"
            + "  album_title   varchar,"
            + "  primary key (singer_id, album_id)"
            + ") interleave in parent singers on delete cascade"));
        Console.WriteLine($"Created Singers & Albums tables in database: [{database}]");


function create_tables(string $host, string $port, string $database): void
    // Connect to Spanner through PGAdapter using the PostgreSQL PDO driver.
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Create two tables in one batch.
    $connection->exec("start batch ddl");
    $connection->exec("create table singers ("
        ."  singer_id   bigint primary key not null,"
        ."  first_name  character varying(1024),"
        ."  last_name   character varying(1024),"
        ."  singer_info bytea,"
        ."  full_name   character varying(2048) generated "
        ."  always as (first_name || ' ' || last_name) stored"
    $connection->exec("create table albums ("
        ."  singer_id     bigint not null,"
        ."  album_id      bigint not null,"
        ."  album_title   character varying(1024),"
        ."  primary key (singer_id, album_id)"
        .") interleave in parent singers on delete cascade");
    $connection->exec("run batch");
    print("Created Singers & Albums tables in database: [{$database}]\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./ example-db


java -jar target/pgadapter-snippets/pgadapter-samples.jar createtables example-db


go run sample_runner.go createtables example-db


npm start createtables example-db


python example-db


dotnet run createtables example-db


php create_tables.php example-db

Il passaggio successivo consiste nello scrivere i dati nel database.

Crea una connessione

Prima di poter eseguire letture o scritture, devi creare una connessione a PGAdapter. Tutte le tue interazioni con Spanner devono avvenire tramite un Connection. Il nome del database è specificato nella stringa di connessione.



# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Connect to Cloud Spanner using psql through PGAdapter
# and execute a simple query.
psql -c "select 'Hello world!' as hello"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class CreateConnection {
  static void createConnection(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection.createStatement().executeQuery("select 'Hello world!' as hello")) {
        while ( {
          System.out.printf("Greeting from Cloud Spanner PostgreSQL: %s\n", resultSet.getString(1));


import (


func CreateConnection(host string, port int, database string) error {
	ctx := context.Background()
	// Connect to Cloud Spanner using pgx through PGAdapter.
	// 'sslmode=disable' is optional, but adding it reduces the connection time,
	// as pgx will then skip first trying to create an SSL connection.
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	row := conn.QueryRow(ctx, "select 'Hello world!' as hello")
	var msg string
	if err := row.Scan(&msg); err != nil {
		return err
	fmt.Printf("Greeting from Cloud Spanner PostgreSQL: %s\n", msg)

	return nil


import { Client } from 'pg';

async function createConnection(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  const result = await connection.query("select 'Hello world!' as hello");
  console.log(`Greeting from Cloud Spanner PostgreSQL: ${result.rows[0]['hello']}`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def create_connection(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    # 'sslmode=disable' is optional, but adding it reduces the connection time,
    # as psycopg3 will then skip first trying to create an SSL connection.
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("select 'Hello world!' as hello")
            print("Greeting from Cloud Spanner PostgreSQL:", cur.fetchone()[0])


using Npgsql;

namespace dotnet_snippets;

public static class CreateConnectionSample
    public static void CreateConnection(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = new NpgsqlCommand("select 'Hello World!' as hello", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
            var greeting = reader.GetString(0);
            Console.WriteLine($"Greeting from Cloud Spanner PostgreSQL: {greeting}");


function create_connection(string $host, string $port, string $database): void
    // Connect to Spanner through PGAdapter using the PostgreSQL PDO driver.
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Execute a query on Spanner through PGAdapter.
    $statement = $connection->query("select 'Hello world!' as hello");
    $rows = $statement->fetchAll();

    printf("Greeting from Cloud Spanner PostgreSQL: %s\n", $rows[0][0]);

    // Cleanup resources.
    $rows = null;
    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar createconnection example-db


go run sample_runner.go createconnection example-db


npm start createconnection example-db


python example-db


dotnet run createconnection example-db


php create_connection.php example-db

Scrivere dati con DML

Puoi inserire dati utilizzando il Data Manipulation Language (DML) in una transazione di lettura/scrittura.

Questi esempi mostrano come eseguire un'istruzione DML su Spanner utilizzando un driver PostgreSQL.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "INSERT INTO singers (singer_id, first_name, last_name) VALUES
                             (12, 'Melissa', 'Garcia'),
                             (13, 'Russel', 'Morales'),
                             (14, 'Jacqueline', 'Long'),
                             (15, 'Dylan', 'Shaw')"

echo "4 records inserted"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDml {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;

  static void writeDataWithDml(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add 4 rows in one statement.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?)")) {

        final List<Singer> singers =
                new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
                new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
                new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
                new Singer(/* SingerId = */ 15L, "Dylan", "Shaw"));

        // Note that JDBC parameters start at index 1.
        int paramIndex = 0;
        for (Singer singer : singers) {
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);

        int updateCount = preparedStatement.executeUpdate();
        System.out.printf("%d records inserted.\n", updateCount);


import (


func WriteDataWithDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	tag, err := conn.Exec(ctx,
		"INSERT INTO singers (singer_id, first_name, last_name) "+
			"VALUES ($1, $2, $3), ($4, $5, $6), "+
			"       ($7, $8, $9), ($10, $11, $12)",
		12, "Melissa", "Garcia",
		13, "Russel", "Morales",
		14, "Jacqueline", "Long",
		15, "Dylan", "Shaw")
	if err != nil {
		return err
	fmt.Printf("%v records inserted\n", tag.RowsAffected())

	return nil


import { Client } from 'pg';

async function writeDataWithDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  const result = await connection.query("INSERT INTO singers (singer_id, first_name, last_name) " +
      "VALUES ($1, $2, $3), ($4, $5, $6), " +
      "       ($7, $8, $9), ($10, $11, $12)",
       [12, "Melissa", "Garcia",
        13, "Russel", "Morales",
        14, "Jacqueline", "Long",
        15, "Dylan", "Shaw"])
  console.log(`${result.rowCount} records inserted`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def write_data_with_dml(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("INSERT INTO singers (singer_id, first_name, last_name)"
                        " VALUES (%s, %s, %s), (%s, %s, %s), "
                        "        (%s, %s, %s), (%s, %s, %s)",
                        (12, "Melissa", "Garcia",
                         13, "Russel", "Morales",
                         14, "Jacqueline", "Long",
                         15, "Dylan", "Shaw",))
            print("%d records inserted" % cur.rowcount)


using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlSample
    readonly struct Singer
        public Singer(long singerId, string firstName, string lastName)
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }

    public static void WriteDataWithDml(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        // Add 4 rows in one statement.
        using var cmd = new NpgsqlCommand("INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                                          + "($1, $2, $3), "
                                          + "($4, $5, $6), "
                                          + "($7, $8, $9), "
                                          + "($10, $11, $12)", connection);
        List<Singer> singers =
            new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
            new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
            new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
            new Singer(/* SingerId = */ 15L, "Dylan", "Shaw")
        foreach (var singer in singers)
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.SingerId });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.FirstName });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.LastName });
        var updateCount = cmd.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");


function write_data_with_dml(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $sql = "INSERT INTO singers (singer_id, first_name, last_name)"
                        ." VALUES (?, ?, ?), (?, ?, ?), "
                        ."        (?, ?, ?), (?, ?, ?)";
    $statement = $connection->prepare($sql);
        12, "Melissa", "Garcia",
        13, "Russel", "Morales",
        14, "Jacqueline", "Long",
        15, "Dylan", "Shaw"
    printf("%d records inserted\n", $statement->rowCount());

    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdml example-db


go run sample_runner.go writeusingdml example-db


npm start writeusingdml example-db


python example-db


dotnet run writeusingdml example-db


php write_data_with_dml.php example-db

Dovresti vedere la seguente risposta:

 4 records inserted.

Scrivere dati con un batch DML

PGAdapter supporta l'esecuzione di batch DML. L'invio di più istruzioni DML in un batch riduce il numero di viaggi di andata e ritorno a Spanner e migliora le prestazioni dell'applicazione.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared insert statement and execute this prepared
# insert statement three times in one SQL string. The single
# SQL string with three insert statements will be executed as
# a single DML batch on Spanner.
psql -c "PREPARE insert_singer AS
           INSERT INTO singers (singer_id, first_name, last_name)
           VALUES (\$1, \$2, \$3)" \
     -c "EXECUTE insert_singer (16, 'Sarah', 'Wilson');
         EXECUTE insert_singer (17, 'Ethan', 'Miller');
         EXECUTE insert_singer (18, 'Maya', 'Patel');"

echo "3 records inserted"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDmlBatch {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;

  static void writeDataWithDmlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add multiple rows in one DML batch.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)")) {
        final List<Singer> singers =
                new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
                new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
                new Singer(/* SingerId = */ 18L, "Maya", "Patel"));

        for (Singer singer : singers) {
          // Note that JDBC parameters start at index 1.
          int paramIndex = 0;
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);

        int[] updateCounts = preparedStatement.executeBatch();
        System.out.printf("%d records inserted.\n",;


import (


func WriteDataWithDmlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	sql := "INSERT INTO singers (singer_id, first_name, last_name) " +
		"VALUES ($1, $2, $3)"
	batch := &pgx.Batch{}
	batch.Queue(sql, 16, "Sarah", "Wilson")
	batch.Queue(sql, 17, "Ethan", "Miller")
	batch.Queue(sql, 18, "Maya", "Patel")
	br := conn.SendBatch(ctx, batch)
	_, err = br.Exec()
	if err := br.Close(); err != nil {
		return err

	if err != nil {
		return err
	fmt.Printf("%v records inserted\n", batch.Len())

	return nil


import { Client } from 'pg';

async function writeDataWithDmlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // node-postgres does not support PostgreSQL pipeline mode, so we must use the
  // `start batch dml` / `run batch` statements to execute a DML batch.
  const sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
  await connection.query("start batch dml");
  await connection.query(sql, [16, "Sarah", "Wilson"]);
  await connection.query(sql, [17, "Ethan", "Miller"]);
  await connection.query(sql, [18, "Maya", "Patel"]);
  const result = await connection.query("run batch");
  // RUN BATCH returns the update counts as an array of strings, with one element for each
  // DML statement in the batch. This calculates the total number of affected rows from that array.
  const updateCount = result.rows[0]["UPDATE_COUNTS"]
      .map((s: string) => parseInt(s))
      .reduce((c: number, current: number) => c + current, 0);
  console.log(`${updateCount} records inserted`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def write_data_with_dml_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.executemany("INSERT INTO singers "
                            "(singer_id, first_name, last_name) "
                            "VALUES (%s, %s, %s)",
                            [(16, "Sarah", "Wilson",),
                             (17, "Ethan", "Miller",),
                             (18, "Maya", "Patel",), ])
            print("%d records inserted" % cur.rowcount)


using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlBatchSample
    readonly struct Singer
        public Singer(long singerId, string firstName, string lastName)
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }

    public static void WriteDataWithDmlBatch(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Add multiple rows in one DML batch.
        const string sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
        List<Singer> singers =
            new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
            new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
            new Singer(/* SingerId = */ 18L, "Maya", "Patel")
        using var batch = new NpgsqlBatch(connection);
        foreach (var singer in singers)
            batch.BatchCommands.Add(new NpgsqlBatchCommand
                CommandText = sql,
                Parameters =
                    new NpgsqlParameter {Value = singer.SingerId},
                    new NpgsqlParameter {Value = singer.FirstName},
                    new NpgsqlParameter {Value = singer.LastName}
        var updateCount = batch.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");


function write_data_with_dml_batch(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Use START BATCH DML / RUN BATCH to run a batch of DML statements.
    // Create a prepared statement for the DML that should be executed.
    $sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)";
    $statement = $connection->prepare($sql);
    // Start a DML batch.
    $connection->exec("START BATCH DML");

    $statement->execute([16, "Sarah", "Wilson"]);
    $statement->execute([17, "Ethan", "Miller"]);
    $statement->execute([18, "Maya", "Patel"]);

    // Run the DML batch. Use the 'query(..)' method, as the update counts are returned as a row
    // containing an array with the update count of each statement in the batch.
    $statement = $connection->query("RUN BATCH");
    $result = $statement->fetchAll();
    $update_count = $result[0][0];

    printf("%s records inserted\n", $update_count);

    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdmlbatch example-db


go run sample_runner.go writeusingdmlbatch example-db


npm start writeusingdmlbatch example-db


python example-db


dotnet run writeusingdmlbatch example-db


php write_data_with_dml_batch.php example-db

Dovresti vedere:

3 records inserted.

Scrivere dati con modifiche

Puoi anche inserire i dati utilizzando le mutazioni.

PGAdapter traduce il comando COPY di PostgreSQL in mutazioni. L'utilizzo di COPY è il modo più efficiente per inserire rapidamente i dati nel tuo database Spanner.

Per impostazione predefinita, le operazioni COPY sono atomiche. Le operazioni atomiche su Spanner sono vincolate dal limite di dimensioni dei commit. Per ulteriori informazioni, consulta la sezione Limite CRUD.

Questi esempi mostrano come eseguire un'operazione COPY non atomica. In questo modo, l'operazione COPY può superare il limite di dimensioni del commit.



# Get the source directory of this script.

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Copy data to Spanner from a tab-separated text file using the COPY command.
psql -c "COPY singers (singer_id, first_name, last_name) FROM STDIN" \
  < "${directory}singers_data.txt"
psql -c "COPY albums FROM STDIN" \
  < "${directory}albums_data.txt"

echo "Copied singers and albums"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class WriteDataWithCopy {

  static void writeDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
      long numSingers =
              "COPY singers (singer_id, first_name, last_name) FROM STDIN",
      System.out.printf("Copied %d singers\n", numSingers);

      long numAlbums =
              "COPY albums FROM STDIN",
      System.out.printf("Copied %d albums\n", numAlbums);


import (


func WriteDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
	// will succeed even if it exceeds Spanner's mutation limit per transaction.
	conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'")

	file, err := os.Open("samples/singers_data.txt")
	if err != nil {
		return err
	tag, err := conn.PgConn().CopyFrom(ctx, file,
		"copy singers (singer_id, first_name, last_name) from stdin")
	if err != nil {
		return err
	fmt.Printf("Copied %v singers\n", tag.RowsAffected())

	file, err = os.Open("samples/albums_data.txt")
	if err != nil {
		return err
	tag, err = conn.PgConn().CopyFrom(ctx, file,
		"copy albums from stdin")
	if err != nil {
		return err
	fmt.Printf("Copied %v albums\n", tag.RowsAffected())

	return nil


import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import { from as copyFrom } from 'pg-copy-streams'
import path from "path";

async function writeDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
  // Copy data from a csv file to Spanner using the COPY command.
  // Note that even though the command says 'from stdin', the actual input comes from a file.
  const copySingersStream = copyFrom('copy singers (singer_id, first_name, last_name) from stdin');
  const ingestSingersStream = connection.query(copySingersStream);
  const sourceSingersStream = fs.createReadStream(path.join(__dirname, 'singers_data.txt'));
  await pipeline(sourceSingersStream, ingestSingersStream);
  console.log(`Copied ${copySingersStream.rowCount} singers`);

  const copyAlbumsStream = copyFrom('copy albums from stdin');
  const ingestAlbumsStream = connection.query(copyAlbumsStream);
  const sourceAlbumsStream = fs.createReadStream(path.join(__dirname, 'albums_data.txt'));
  await pipeline(sourceAlbumsStream, ingestAlbumsStream);
  console.log(`Copied ${copyAlbumsStream.rowCount} albums`);

  // Close the connection.
  await connection.end();


import os
import string
import psycopg

def write_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:

        script_dir = os.path.dirname(os.path.abspath(__file__))
        singers_file_path = os.path.join(script_dir, "singers_data.txt")
        albums_file_path = os.path.join(script_dir, "albums_data.txt")

        conn.autocommit = True
        block_size = 1024
        with conn.cursor() as cur:
            with open(singers_file_path, "r") as f:
                with cur.copy("COPY singers (singer_id, first_name, last_name) "
                              "FROM STDIN") as copy:
                    while data :=
            print("Copied %d singers" % cur.rowcount)

            with open(albums_file_path, "r") as f:
                with cur.copy("COPY albums "
                              "FROM STDIN") as copy:
                    while data :=
            print("Copied %d albums" % cur.rowcount)


using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithCopySample
    public static void WriteDataWithCopy(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = new NpgsqlCommand("set spanner.autocommit_dml_mode='partitioned_non_atomic'", connection);

        var singerCount = 0;
        using var singerReader = new StreamReader("singers_data.txt");
        using (var singerWriter = connection.BeginTextImport("COPY singers (singer_id, first_name, last_name) FROM STDIN"))
            while (singerReader.ReadLine() is { } line)
        Console.WriteLine($"Copied {singerCount} singers");

        var albumCount = 0;
        using var albumReader = new StreamReader("albums_data.txt");
        using (var albumWriter = connection.BeginTextImport("COPY albums FROM STDIN"))
            while (albumReader.ReadLine() is { } line)
        Console.WriteLine($"Copied {albumCount} albums");


function write_data_with_copy(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $dir = dirname(__FILE__);

        sprintf("%s/singers_data.txt", $dir),
        "singer_id, first_name, last_name");
    print("Copied 5 singers\n");

        sprintf("%s/albums_data.txt", $dir));
    print("Copied 5 albums\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar write example-db


go run sample_runner.go write example-db


npm start write example-db


python example-db


dotnet run write example-db


php write_data_with_copy.php example-db

Dovresti vedere:

Copied 5 singers
Copied 5 albums

Esegui query sui dati utilizzando SQL

Spanner supporta un'interfaccia SQL per la lettura dei dati, a cui puoi accedere sulla riga di comando utilizzando la CLI Google Cloud o tramite programmazione utilizzando un driver PostgreSQL.

Nella riga di comando

Esegui la seguente istruzione SQL per leggere i valori di tutte le colonne della tabellaAlbums:

gcloud spanner databases execute-sql example-db --instance=test-instance \
    --sql='SELECT singer_id, album_id, album_title FROM albums'

Il risultato dovrebbe essere:

SingerId AlbumId AlbumTitle
1        1       Total Junk
1        2       Go, Go, Go
2        1       Green
2        2       Forever Hold Your Peace
2        3       Terrified

Utilizzare un driver PostgreSQL

Oltre a eseguire un'istruzione SQL sulla riga di comando, puoi emettere la stessa istruzione SQL in modo programmatico utilizzando un driver PostgreSQL.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, album_title
         FROM albums"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryData {
  static void queryData(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
              .executeQuery("SELECT singer_id, album_id, album_title FROM albums")) {
        while ( {
              "%d %d %s\n",


import (


func QueryData(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, album_title "+
		"FROM albums")
	defer rows.Close()
	if err != nil {
		return err
	for rows.Next() {
		var singerId, albumId int64
		var title string
		err = rows.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		fmt.Printf("%v %v %v\n", singerId, albumId, title)

	return rows.Err()


import { Client } from 'pg';

async function queryData(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  const result = await connection.query("SELECT singer_id, album_id, album_title " +
      "FROM albums");
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def query_data(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, album_title "
                        "FROM albums")
            for album in cur:


using Npgsql;

namespace dotnet_snippets;

public static class QueryDataSample
    public static void QueryData(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, album_title FROM albums", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
            Console.WriteLine($"{reader.GetInt64(0)} {reader.GetInt64(1)} {reader.GetString(2)}");


function query_data(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $statement = $connection->query("SELECT singer_id, album_id, album_title "
        ."FROM albums "
        ."ORDER BY singer_id, album_id"
    $rows = $statement->fetchAll();
    foreach ($rows as $album)
        printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);

    $rows = null;
    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar query example-db


go run sample_runner.go query example-db


npm start query example-db


python example-db


dotnet run query example-db


php query_data.php example-db

Dovresti vedere il seguente risultato:

1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified

Esegui una query utilizzando un parametro SQL

Se la tua applicazione ha una query eseguita di frequente, puoi migliorarne le prestazioni parametrizzandola. La query parametrica risultante può essere memorizzata nella cache e riutilizzata, il che consente di ridurre i costi di compilazione. Per ulteriori informazioni, consulta Utilizzare i parametri di query per velocizzare le query eseguite di frequente.

Ecco un esempio di utilizzo di un parametro nella clausola WHERE per eseguire query sui record contenenti un valore specifico per LastName.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared statement to use a query parameter.
# Using a prepared statement for executing the same SQL string multiple
# times increases the execution speed of the statement.
psql -c "PREPARE select_singer AS
         SELECT singer_id, first_name, last_name
         FROM singers
         WHERE last_name = \$1" \
     -c "EXECUTE select_singer ('Garcia')"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithParameter {
  static void queryDataWithParameter(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (PreparedStatement statement =
              "SELECT singer_id, first_name, last_name "
                  + "FROM singers "
                  + "WHERE last_name = ?")) {
        statement.setString(1, "Garcia");
        try (ResultSet resultSet = statement.executeQuery()) {
          while ( {
                "%d %s %s\n",


import (


func QueryDataWithParameter(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx,
		"SELECT singer_id, first_name, last_name "+
			"FROM singers "+
			"WHERE last_name = $1", "Garcia")
	defer rows.Close()
	if err != nil {
		return err
	for rows.Next() {
		var singerId int64
		var firstName, lastName string
		err = rows.Scan(&singerId, &firstName, &lastName)
		if err != nil {
			return err
		fmt.Printf("%v %v %v\n", singerId, firstName, lastName)

	return rows.Err()


import { Client } from 'pg';

async function queryWithParameter(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, first_name, last_name " +
      "FROM singers " +
      "WHERE last_name = $1", ["Garcia"]);
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def query_data_with_parameter(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, first_name, last_name "
                        "FROM singers "
                        "WHERE last_name = %s", ("Garcia",))
            for singer in cur:


using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithParameterSample
    public static void QueryDataWithParameter(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = new NpgsqlCommand("SELECT singer_id, first_name, last_name "
                                          + "FROM singers "
                                          + "WHERE last_name = $1", connection);
        cmd.Parameters.Add(new NpgsqlParameter { Value = "Garcia" });
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
            Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");


function query_data_with_parameter(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $statement = $connection->prepare("SELECT singer_id, first_name, last_name "
                        ."FROM singers "
                        ."WHERE last_name = ?"
    $rows = $statement->fetchAll();
    foreach ($rows as $singer)
        printf("%s\t%s\t%s\n", $singer["singer_id"], $singer["first_name"], $singer["last_name"]);

    $rows = null;
    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar querywithparameter example-db


go run sample_runner.go querywithparameter example-db


npm start querywithparameter example-db


python example-db


dotnet run querywithparameter example-db


php query_data_with_parameter.php example-db

Dovresti vedere il seguente risultato:

12 Melissa Garcia

Aggiorna lo schema del database

Supponiamo che tu debba aggiungere una nuova colonna denominata MarketingBudget alla tabella Albums. L'aggiunta di una nuova colonna a una tabella esistente richiede un aggiornamento dello schema del database. Spanner supporta gli aggiornamenti dello schema di un database mentre il database continua a gestire il traffico. Gli aggiornamenti dello schema non richiedono il mettere offline il database e non bloccano intere tabelle o colonne. Puoi continuare a scrivere dati nel database durante l'aggiornamento dello schema. Scopri di più sugli aggiornamenti dello schema supportati e sul rendimento delle modifiche dello schema in Eseguire aggiornamenti dello schema.

Aggiungere una colonna

Puoi aggiungere una colonna sulla riga di comando utilizzando Google Cloud CLI o programmaticamente utilizzando un driver PostgreSQL.

Nella riga di comando

Utilizza il seguente comando ALTER TABLE per aggiungere la nuova colonna alla tabella:

gcloud spanner databases ddl update example-db --instance=test-instance \
    --ddl='ALTER TABLE albums ADD COLUMN marketing_budget BIGINT'

Dovresti vedere:

Schema updating...done.

Utilizzare un driver PostgreSQL

Esegui l'istruzione DDL utilizzando un driver PostgreSQL per modificare lo schema:



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "ALTER TABLE albums ADD COLUMN marketing_budget bigint"
echo "Added marketing_budget column"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

class AddColumn {
  static void addColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      connection.createStatement().execute("alter table albums add column marketing_budget bigint");
      System.out.println("Added marketing_budget column");


import (


func AddColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	_, err = conn.Exec(ctx,
		"ALTER TABLE albums "+
			"ADD COLUMN marketing_budget bigint")
	if err != nil {
		return err
	fmt.Println("Added marketing_budget column")

	return nil


import { Client } from 'pg';

async function addColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  await connection.query(
      "ALTER TABLE albums " +
      "ADD COLUMN marketing_budget bigint");
  console.log("Added marketing_budget column");

  // Close the connection.
  await connection.end();


import string
import psycopg

def add_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("ALTER TABLE albums "
                        "ADD COLUMN marketing_budget bigint")
            print("Added marketing_budget column")


using Npgsql;

namespace dotnet_snippets;

public static class AddColumnSample
    public static void AddColumn(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = connection.CreateCommand();
        cmd.CommandText = "alter table albums add column marketing_budget bigint";
        Console.WriteLine("Added marketing_budget column");


function add_column(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $connection->exec("ALTER TABLE albums ADD COLUMN marketing_budget bigint");
    print("Added marketing_budget column\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar addmarketingbudget example-db


go run sample_runner.go addmarketingbudget example-db


npm start addmarketingbudget example-db


python example-db


dotnet run addmarketingbudget example-db


php add_column.php example-db

Dovresti vedere:

Added marketing_budget column

Esegui un batch DDL

Ti consigliamo di eseguire più modifiche allo schema in un unico batch. Puoi eseguire più istruzioni DDL in un batch utilizzando la funzionalità di batching integrata del driver PostgreSQL, inviando tutte le istruzioni DDL come una stringa SQL separata da punti e virgola o utilizzando le istruzioni START BATCH DDL e RUN BATCH.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Use a single SQL command to batch multiple statements together.
# Executing multiple DDL statements as one batch is more efficient
# than executing each statement individually.
# Separate the statements with semicolons.
psql << SQL

  venue_id    bigint not null primary key,
  name        varchar(1024),
  description jsonb

CREATE TABLE concerts (
  concert_id bigint not null primary key ,
  venue_id   bigint not null,
  singer_id  bigint not null,
  start_time timestamptz,
  end_time   timestamptz,
  constraint fk_concerts_venues foreign key
    (venue_id) references venues (venue_id),
  constraint fk_concerts_singers foreign key
    (singer_id) references singers (singer_id)


echo "Added venues and concerts tables"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class DdlBatch {
  static void ddlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two new tables in one batch.
            "CREATE TABLE venues ("
                + "  venue_id    bigint not null primary key,"
                + "  name        varchar(1024),"
                + "  description jsonb"
                + ")");
            "CREATE TABLE concerts ("
                + "  concert_id bigint not null primary key ,"
                + "  venue_id   bigint not null,"
                + "  singer_id  bigint not null,"
                + "  start_time timestamptz,"
                + "  end_time   timestamptz,"
                + "  constraint fk_concerts_venues foreign key"
                + "    (venue_id) references venues (venue_id),"
                + "  constraint fk_concerts_singers foreign key"
                + "    (singer_id) references singers (singer_id)"
                + ")");
      System.out.println("Added venues and concerts tables");


import (


func DdlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Executing multiple DDL statements as one batch is
	// more efficient than executing each statement
	// individually.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "CREATE TABLE venues (" +
			"  venue_id    bigint not null primary key," +
			"  name        varchar(1024)," +
			"  description jsonb" +
		{SQL: "CREATE TABLE concerts (" +
			"  concert_id bigint not null primary key ," +
			"  venue_id   bigint not null," +
			"  singer_id  bigint not null," +
			"  start_time timestamptz," +
			"  end_time   timestamptz," +
			"  constraint fk_concerts_venues foreign key" +
			"    (venue_id) references venues (venue_id)," +
			"  constraint fk_concerts_singers foreign key" +
			"    (singer_id) references singers (singer_id)" +
	if _, err := br.Exec(); err != nil {
		return err
	if err := br.Close(); err != nil {
		return err
	fmt.Println("Added venues and concerts tables")

	return nil


import { Client } from 'pg';

async function ddlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Executing multiple DDL statements as one batch is
  // more efficient than executing each statement
  // individually.
  await connection.query("start batch ddl");
  await connection.query("CREATE TABLE venues (" +
      "  venue_id    bigint not null primary key," +
      "  name        varchar(1024)," +
      "  description jsonb" +
  await connection.query("CREATE TABLE concerts (" +
      "  concert_id bigint not null primary key ," +
      "  venue_id   bigint not null," +
      "  singer_id  bigint not null," +
      "  start_time timestamptz," +
      "  end_time   timestamptz," +
      "  constraint fk_concerts_venues foreign key" +
      "    (venue_id) references venues (venue_id)," +
      "  constraint fk_concerts_singers foreign key" +
      "    (singer_id) references singers (singer_id)" +
  await connection.query("run batch");
  console.log("Added venues and concerts tables");

  // Close the connection.
  await connection.end();


import string
import psycopg

def ddl_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        # Use a pipeline to batch multiple statements together.
        # Executing multiple DDL statements as one batch is
        # more efficient than executing each statement
        # individually.
        with conn.pipeline():
            # The following statements are buffered on PGAdapter
            # until the pipeline ends.
            conn.execute("CREATE TABLE venues ("
                         "  venue_id    bigint not null primary key,"
                         "  name        varchar(1024),"
                         "  description jsonb"
            conn.execute("CREATE TABLE concerts ("
                         "  concert_id bigint not null primary key ,"
                         "  venue_id   bigint not null,"
                         "  singer_id  bigint not null,"
                         "  start_time timestamptz,"
                         "  end_time   timestamptz,"
                         "  constraint fk_concerts_venues foreign key"
                         "    (venue_id) references venues (venue_id),"
                         "  constraint fk_concerts_singers foreign key"
                         "    (singer_id) references singers (singer_id)"
        print("Added venues and concerts tables")


using Npgsql;

namespace dotnet_snippets;

public static class DdlBatchSample
    public static void DdlBatch(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Create two new tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE venues ("
            + "  venue_id    bigint not null primary key,"
            + "  name        varchar(1024),"
            + "  description jsonb"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE concerts ("
            + "  concert_id bigint not null primary key ,"
            + "  venue_id   bigint not null,"
            + "  singer_id  bigint not null,"
            + "  start_time timestamptz,"
            + "  end_time   timestamptz,"
            + "  constraint fk_concerts_venues foreign key"
            + "    (venue_id) references venues (venue_id),"
            + "  constraint fk_concerts_singers foreign key"
            + "    (singer_id) references singers (singer_id)"
            + ")"));
        Console.WriteLine("Added venues and concerts tables");


function ddl_batch(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Executing multiple DDL statements as one batch is
    // more efficient than executing each statement
    // individually.
    $connection->exec("start batch ddl");
    $connection->exec("CREATE TABLE venues ("
        ."  venue_id    bigint not null primary key,"
        ."  name        varchar(1024),"
        ."  description jsonb"
    $connection->exec("CREATE TABLE concerts ("
        ."  concert_id bigint not null primary key ,"
        ."  venue_id   bigint not null,"
        ."  singer_id  bigint not null,"
        ."  start_time timestamptz,"
        ."  end_time   timestamptz,"
        ."  constraint fk_concerts_venues foreign key"
        ."    (venue_id) references venues (venue_id),"
        ."  constraint fk_concerts_singers foreign key"
        ."    (singer_id) references singers (singer_id)"
    $connection->exec("run batch");
    print("Added venues and concerts tables\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar ddlbatch example-db


go run sample_runner.go ddlbatch example-db


npm start ddlbatch example-db


python example-db


dotnet run ddlbatch example-db


php ddl_batch.php example-db

Dovresti vedere:

Added venues and concerts tables

Scrivere i dati nella nuova colonna

Il codice seguente scrive i dati nella nuova colonna. Imposta MarketingBudget su 100000 per la riga con chiave Albums(1, 1) e su 500000 per la riga con chiave Albums(2, 2).

PGAdapter traduce il comando COPY di PostgreSQL in mutazioni. Per impostazione predefinita, i comandi COPY vengono tradotti in mutazioni Insert. Esegui set spanner.copy_upsert=true per tradurre i comandi COPY in mutazioni InsertOrUpdate. Questo può essere utilizzato per aggiornare i dati esistenti in Spanner.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
psql -c "set spanner.copy_upsert=true" \
     -c "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN
         WITH (DELIMITER ';')" \

echo "Copied albums using upsert"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class UpdateDataWithCopy {

  static void updateDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

      // Instruct PGAdapter to use insert-or-update for COPY statements.
      // This enables us to use COPY to update existing data.
      connection.createStatement().execute("set spanner.copy_upsert=true");

      // COPY uses mutations to insert or update existing data in Spanner.
      long numAlbums =
              "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN",
              new StringReader("1\t1\t100000\n" + "2\t2\t500000\n"));
      System.out.printf("Updated %d albums\n", numAlbums);


import (


func UpdateDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Enable non-atomic mode. This makes the COPY operation non-atomic,
	// and allows it to exceed the Spanner mutation limit.
	if _, err := conn.Exec(ctx,
		"set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
		return err
	// Instruct PGAdapter to use insert-or-update for COPY statements.
	// This enables us to use COPY to update data.
	if _, err := conn.Exec(ctx, "set spanner.copy_upsert=true"); err != nil {
		return err

	// Create a pipe that can be used to write the data manually that we want to copy.
	reader, writer := io.Pipe()
	// Write the data to the pipe using a separate goroutine. This allows us to stream the data
	// to the COPY operation row-by-row.
	go func() error {
		for _, record := range []string{"1\t1\t100000\n", "2\t2\t500000\n"} {
			if _, err := writer.Write([]byte(record)); err != nil {
				return err
		if err := writer.Close(); err != nil {
			return err
		return nil
	tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN")
	if err != nil {
		return err
	fmt.Printf("Updated %v albums\n", tag.RowsAffected())

	return nil


import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import {Readable} from "stream";

async function updateDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

  // Instruct PGAdapter to use insert-or-update for COPY statements.
  // This enables us to use COPY to update existing data.
  await connection.query("set spanner.copy_upsert=true");

  // Copy data to Spanner using the COPY command.
  const copyStream = copyFrom('COPY albums (singer_id, album_id, marketing_budget) FROM STDIN');
  const ingestStream = connection.query(copyStream);

  // Create a source stream and attach the source to the destination.
  const sourceStream = new Readable();
  const operation = pipeline(sourceStream, ingestStream);
  // Manually push data to the source stream to write data to Spanner.
  // Push a 'null' to indicate the end of the stream.
  // Wait for the copy operation to finish.
  await operation;
  console.log(`Updated ${copyStream.rowCount} albums`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def update_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            # Instruct PGAdapter to use insert-or-update for COPY statements.
            # This enables us to use COPY to update data.
            cur.execute("set spanner.copy_upsert=true")

            # COPY uses mutations to insert or update existing data in Spanner.
            with cur.copy("COPY albums (singer_id, album_id, marketing_budget) "
                          "FROM STDIN") as copy:
                copy.write_row((1, 1, 100000))
                copy.write_row((2, 2, 500000))
            print("Updated %d albums" % cur.rowcount)


using Npgsql;

namespace dotnet_snippets;

public static class UpdateDataWithCopySample
    public static void UpdateDataWithCopy(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = connection.CreateCommand();
        cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";

        // Instruct PGAdapter to use insert-or-update for COPY statements.
        // This enables us to use COPY to update existing data.
        cmd.CommandText = "set spanner.copy_upsert=true";

        // COPY uses mutations to insert or update existing data in Spanner.
        using (var albumWriter = connection.BeginTextImport(
                   "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN"))
        Console.WriteLine($"Updated 2 albums");


function update_data_with_copy(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Instruct PGAdapter to use insert-or-update for COPY statements.
    // This enables us to use COPY to update data.
    $connection->exec("set spanner.copy_upsert=true");

    // COPY uses mutations to insert or update existing data in Spanner.
        ["1\t1\t100000", "2\t2\t500000"],
        "singer_id, album_id, marketing_budget",
    print("Updated 2 albums\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar update example-db


go run sample_runner.go update example-db


npm start update example-db


python example-db


dotnet run update example-db


php update_data_with_copy.php example-db

Dovresti vedere:

Updated 2 albums

Puoi anche eseguire una query SQL per recuperare i valori che hai appena scritto.

Ecco il codice per eseguire la query:



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, marketing_budget
         FROM albums
         ORDER BY singer_id, album_id"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithNewColumn {
  static void queryDataWithNewColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
                  "SELECT singer_id, album_id, marketing_budget "
                      + "FROM albums "
                      + "ORDER BY singer_id, album_id")) {
        while ( {
              "%d %d %s\n",


import (


func QueryDataWithNewColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, marketing_budget "+
		"FROM albums "+
		"ORDER BY singer_id, album_id")
	defer rows.Close()
	if err != nil {
		return err
	for rows.Next() {
		var singerId, albumId int64
		var marketingBudget sql.NullString
		err = rows.Scan(&singerId, &albumId, &marketingBudget)
		if err != nil {
			return err
		var budget string
		if marketingBudget.Valid {
			budget = marketingBudget.String
		} else {
			budget = "NULL"
		fmt.Printf("%v %v %v\n", singerId, albumId, budget)

	return rows.Err()


import { Client } from 'pg';

async function queryDataWithNewColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, album_id, marketing_budget "
      + "FROM albums "
      + "ORDER BY singer_id, album_id"
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["marketing_budget"]}`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def query_data_with_new_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, marketing_budget "
                        "FROM albums "
                        "ORDER BY singer_id, album_id")
            for album in cur:


using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithNewColumnSample
    public static void QueryWithNewColumnData(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, marketing_budget "
                                          + "FROM albums "
                                          + "ORDER BY singer_id, album_id", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
            Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["marketing_budget"]}");


function query_data_with_new_column(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    $statement = $connection->query(
        "SELECT singer_id, album_id, marketing_budget "
        ."FROM albums "
        ."ORDER BY singer_id, album_id"
    $rows = $statement->fetchAll();
    foreach ($rows as $album)
        printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["marketing_budget"]);

    $rows = null;
    $statement = null;
    $connection = null;

Esegui la query con questo comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar querymarketingbudget example-db


go run sample_runner.go querymarketingbudget example-db


npm start querymarketingbudget example-db


python example-db


dotnet run querymarketingbudget example-db


php query_data_with_new_column.php example-db

Dovresti vedere:

1 1 100000
1 2 null
2 1 null
2 2 500000
2 3 null

Aggiornare i dati

Puoi aggiornare i dati utilizzando DML in una transazione di lettura/scrittura.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Transfer marketing budget from one album to another.
  -- We do it in a transaction to ensure that the transfer is atomic.
  -- Begin a read/write transaction.

  -- Increase the marketing budget of album 1 if album 2 has enough budget.
  -- The condition that album 2 has enough budget is guaranteed for the
  -- duration of the transaction, as read/write transactions in Spanner use
  -- external consistency as the default isolation level.
  update albums set
    marketing_budget = marketing_budget + 200000
  where singer_id = 1
    and  album_id = 1
    and exists (
      select album_id
      from albums
      where singer_id = 2
        and  album_id = 2
        and marketing_budget > 200000

  -- Decrease the marketing budget of album 2.      
  update albums set
    marketing_budget = marketing_budget - 200000
  where singer_id = 2
    and  album_id = 2
    and marketing_budget > 200000;

  -- Commit the transaction to make the changes to both marketing budgets
  -- durably stored in the database and visible to other transactions.

echo "Transferred marketing budget from Album 2 to Album 1"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class UpdateDataWithTransaction {

  static void writeWithTransactionUsingDml(String host, int port, String database)
      throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.

      // Transfer marketing budget from one album to another. We do it in a
      // transaction to ensure that the transfer is atomic. There is no need
      // to explicitly start the transaction. The first statement on the
      // connection will start a transaction when AutoCommit=false.
      String selectMarketingBudgetSql =
          "SELECT marketing_budget from albums WHERE singer_id = ? and album_id = ?";
      long album2Budget = 0;
      try (PreparedStatement selectMarketingBudgetStatement =
          connection.prepareStatement(selectMarketingBudgetSql)) {
        // Bind the query parameters to SingerId=2 and AlbumId=2.
        selectMarketingBudgetStatement.setLong(1, 2);
        selectMarketingBudgetStatement.setLong(2, 2);
        try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
          while ( {
            album2Budget = resultSet.getLong("marketing_budget");
        // The transaction will only be committed if this condition still holds
        // at the time of commit. Otherwise, the transaction will be aborted.
        final long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget = 0;
          // Re-use the existing PreparedStatement for selecting the
          // marketing_budget to get the budget for Album 1.
          // Bind the query parameters to SingerId=1 and AlbumId=1.
          selectMarketingBudgetStatement.setLong(1, 1);
          selectMarketingBudgetStatement.setLong(2, 1);
          try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
            while ( {
              album1Budget = resultSet.getLong("marketing_budget");

          // Transfer part of the marketing budget of Album 2 to Album 1.
          album1Budget += transfer;
          album2Budget -= transfer;
          String updateSql =
              "UPDATE albums "
                  + "SET marketing_budget = ? "
                  + "WHERE singer_id = ? and album_id = ?";
          try (PreparedStatement updateStatement = connection.prepareStatement(updateSql)) {
            // Update Album 1.
            int paramIndex = 0;
            updateStatement.setLong(++paramIndex, album1Budget);
            updateStatement.setLong(++paramIndex, 1);
            updateStatement.setLong(++paramIndex, 1);
            // Create a DML batch by calling addBatch
            // on the current PreparedStatement.

            // Update Album 2 in the same DML batch.
            paramIndex = 0;
            updateStatement.setLong(++paramIndex, album2Budget);
            updateStatement.setLong(++paramIndex, 2);
            updateStatement.setLong(++paramIndex, 2);

            // Execute both DML statements in one batch.
      // Commit the current transaction.
      System.out.println("Transferred marketing budget from Album 2 to Album 1");


import (


func WriteWithTransactionUsingDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Transfer marketing budget from one album to another. We do it in a
	// transaction to ensure that the transfer is atomic.
	tx, err := conn.Begin(ctx)
	if err != nil {
		return err
	const selectSql = "SELECT marketing_budget " +
		"from albums " +
		"WHERE singer_id = $1 and album_id = $2"
	// Get the marketing_budget of singer 2 / album 2.
	row := tx.QueryRow(ctx, selectSql, 2, 2)
	var budget2 int64
	if err := row.Scan(&budget2); err != nil {
		return err
	const transfer = 20000
	// The transaction will only be committed if this condition still holds
	// at the time of commit. Otherwise, the transaction will be aborted.
	if budget2 >= transfer {
		// Get the marketing_budget of singer 1 / album 1.
		row := tx.QueryRow(ctx, selectSql, 1, 1)
		var budget1 int64
		if err := row.Scan(&budget1); err != nil {
			return err
		// Transfer part of the marketing budget of Album 2 to Album 1.
		budget1 += transfer
		budget2 -= transfer
		const updateSql = "UPDATE albums " +
			"SET marketing_budget = $1 " +
			"WHERE singer_id = $2 and album_id = $3"
		// Start a DML batch and execute it as part of the current transaction.
		batch := &pgx.Batch{}
		batch.Queue(updateSql, budget1, 1, 1)
		batch.Queue(updateSql, budget2, 2, 2)
		br := tx.SendBatch(ctx, batch)
		_, err = br.Exec()
		if err := br.Close(); err != nil {
			return err
	// Commit the current transaction.
	fmt.Println("Transferred marketing budget from Album 2 to Album 1")

	return nil


import { Client } from 'pg';

async function writeWithTransactionUsingDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Transfer marketing budget from one album to another. We do it in a
  // transaction to ensure that the transfer is atomic. node-postgres
  // requires you to explicitly start the transaction by executing 'begin'.
  await connection.query("begin");
  const selectMarketingBudgetSql = "SELECT marketing_budget " +
      "from albums " +
      "WHERE singer_id = $1 and album_id = $2";
  // Get the marketing_budget of singer 2 / album 2.
  const album2BudgetResult = await connection.query(selectMarketingBudgetSql, [2, 2]);
  let album2Budget = album2BudgetResult.rows[0]["marketing_budget"];
  const transfer = 200000;
  // The transaction will only be committed if this condition still holds
  // at the time of commit. Otherwise, the transaction will be aborted.
  if (album2Budget >= transfer) {
    // Get the marketing budget of singer 1 / album 1.
    const album1BudgetResult = await connection.query(selectMarketingBudgetSql, [1, 1]);
    let album1Budget = album1BudgetResult.rows[0]["marketing_budget"];
    // Transfer part of the marketing budget of Album 2 to Album 1.
    album1Budget += transfer;
    album2Budget -= transfer;
    const updateSql = "UPDATE albums " +
        "SET marketing_budget = $1 " +
        "WHERE singer_id = $2 and album_id = $3";
    // Start a DML batch. This batch will become part of the current transaction.
    // TODO: Enable when has been merged
    // await connection.query("start batch dml");
    // Update the marketing budget of both albums.
    await connection.query(updateSql, [album1Budget, 1, 1]);
    await connection.query(updateSql, [album2Budget, 2, 2]);
    // TODO: Enable when has been merged
    // await connection.query("run batch");
  // Commit the current transaction.
  await connection.query("commit");
  console.log("Transferred marketing budget from Album 2 to Album 1");

  // Close the connection.
  await connection.end();


import string
import psycopg

def update_data_with_transaction(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        # Set autocommit=False to use transactions.
        # The first statement that is executed starts the transaction.
        conn.autocommit = False
        with conn.cursor() as cur:
            # Transfer marketing budget from one album to another.
            # We do it in a transaction to ensure that the transfer is atomic.
            # There is no need to explicitly start the transaction. The first
            # statement on the connection will start a transaction when
            # AutoCommit=false.
            select_marketing_budget_sql = ("SELECT marketing_budget "
                                           "from albums "
                                           "WHERE singer_id = %s "
                                           "and album_id = %s")
            # Get the marketing budget of Album #2.
            cur.execute(select_marketing_budget_sql, (2, 2))
            album2_budget = cur.fetchone()[0]
            transfer = 200000
            if album2_budget > transfer:
                # Get the marketing budget of Album #1.
                cur.execute(select_marketing_budget_sql, (1, 1))
                album1_budget = cur.fetchone()[0]
                # Transfer the marketing budgets and write the update back
                # to the database.
                album1_budget += transfer
                album2_budget -= transfer
                update_sql = ("update albums "
                              "set marketing_budget = %s "
                              "where singer_id = %s "
                              "and   album_id = %s")
                # Use a pipeline to execute two DML statements in one batch.
                with conn.pipeline():
                    cur.execute(update_sql, (album1_budget, 1, 1,))
                    cur.execute(update_sql, (album2_budget, 2, 2,))
                print("Insufficient budget to transfer")
        # Commit the transaction.
        print("Transferred marketing budget from Album 2 to Album 1")


using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class TagsSample
    public static void Tags(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Start a transaction with isolation level Serializable.
        // Spanner only supports this isolation level. Trying to use a lower
        // isolation level (including the default isolation level READ COMMITTED),
        // will result in an error.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);

        // Create a command that uses the current transaction.
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;

        // Set the TRANSACTION_TAG session variable to set a transaction tag
        // for the current transaction.
        cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";

        // Set the STATEMENT_TAG session variable to set the request tag
        // that should be included with the next SQL statement.
        cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";

        // Get the marketing_budget of Album (1,1).
        cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        var marketingBudget = (long?)cmd.ExecuteScalar();

        // Reduce the marketing budget by 10% if it is more than 1,000.
        if (marketingBudget > 1000L)
            marketingBudget -= (long) (marketingBudget * 0.1);

            // Set the statement tag to use for the update statement.
            cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";

            cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
            cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });

        // Commit the current transaction.
        Console.WriteLine("Reduced marketing budget");


function update_data_with_transaction(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Start a read/write transaction.
    // Transfer marketing budget from one album to another.
    // We do it in a transaction to ensure that the transfer is atomic.

    // Create a prepared statement that we can use to execute the same
    // SQL string multiple times with different parameter values.
    $select_marketing_budget_statement = $connection->prepare(
        "SELECT marketing_budget "
        ."from albums "
        ."WHERE singer_id = ? "
        ."and album_id = ?"
    // Get the marketing budget of Album #2.
    $select_marketing_budget_statement->execute([2, 2]);
    $album2_budget = $select_marketing_budget_statement->fetchAll()[0][0];

    $transfer = 200000;
    if ($album2_budget > $transfer) {
        // Get the marketing budget of Album #1.
        $select_marketing_budget_statement->execute([1, 1]);
        $album1_budget = $select_marketing_budget_statement->fetchAll()[0][0];
        // Transfer the marketing budgets and write the update back
        // to the database.
        $album1_budget += $transfer;
        $album2_budget -= $transfer;
        // PHP PDO also supports named query parameters.
        $update_statement = $connection->prepare(
            "update albums "
                ."set marketing_budget = :budget "
                ."where singer_id = :singer_id "
                ."and   album_id = :album_id"
        // Start a DML batch. This batch will become part of the current transaction.
        // $connection->exec("start batch dml");
        // Update the marketing budget of both albums.
        $update_statement->execute(["budget" => $album1_budget, "singer_id" => 1, "album_id" => 1]);
        $update_statement->execute(["budget" => $album2_budget, "singer_id" => 2, "album_id" => 2]);
        // $connection->exec("run batch");
    } else {
        print("Insufficient budget to transfer\n");
    // Commit the transaction.
    print("Transferred marketing budget from Album 2 to Album 1\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar writewithtransactionusingdml example-db


go run sample_runner.go writewithtransactionusingdml example-db


npm start writewithtransactionusingdml example-db


python example-db


dotnet run writewithtransactionusingdml example-db


php update_data_with_transaction.php example-db

Dovresti vedere:

Transferred marketing budget from Album 2 to Album 1

Tag transazione e tag di richiesta

Utilizza i tag di transazione e di richiesta per risolvere i problemi relativi a transazioni e query in Spanner. Puoi impostare i tag di transazione e i tag di richiesta con le variabili di sessione SPANNER.TRANSACTION_TAG e SPANNER.STATEMENT_TAG.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Start a transaction.
  -- Set the TRANSACTION_TAG session variable to set a transaction tag
  -- for the current transaction. This can only be executed at the start
  -- of the transaction.
  set spanner.transaction_TAG='example-tx-tag';

  -- Set the STATEMENT_TAG session variable to set the request tag
  -- that should be included with the next SQL statement.
  set spanner.statement_tag='query-marketing-budget';

  select marketing_budget
  from albums
  where singer_id = 1
    and album_id  = 1;

  -- Reduce the marketing budget by 10% if it is more than 1,000.
  -- Set a statement tag for the update statement.
  set spanner.statement_tag='reduce-marketing-budget';

  update albums
    set marketing_budget = marketing_budget - (marketing_budget * 0.1)::bigint
  where singer_id = 1
    and album_id  = 1
    and marketing_budget > 1000;


echo "Reduced marketing budget"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class Tags {

  static void tags(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      // Set the TRANSACTION_TAG session variable to set a transaction tag
      // for the current transaction.
      connection.createStatement().execute("set spanner.transaction_tag='example-tx-tag'");

      // Set the STATEMENT_TAG session variable to set the request tag
      // that should be included with the next SQL statement.
      connection.createStatement().execute("set spanner.statement_tag='query-marketing-budget'");
      long marketingBudget = 0L;
      long singerId = 1L;
      long albumId = 1L;
      try (PreparedStatement statement =
              "select marketing_budget from albums where singer_id=? and album_id=?")) {
        statement.setLong(1, singerId);
        statement.setLong(2, albumId);
        try (ResultSet albumResultSet = statement.executeQuery()) {
          while ( {
            marketingBudget = albumResultSet.getLong(1);
      // Reduce the marketing budget by 10% if it is more than 1,000.
      final long maxMarketingBudget = 1000L;
      final float reduction = 0.1f;
      if (marketingBudget > maxMarketingBudget) {
        marketingBudget -= (long) (marketingBudget * reduction);
        connection.createStatement().execute("set spanner.statement_tag='reduce-marketing-budget'");
        try (PreparedStatement statement =
                "update albums set marketing_budget=? where singer_id=? AND album_id=?")) {
          int paramIndex = 0;
          statement.setLong(++paramIndex, marketingBudget);
          statement.setLong(++paramIndex, singerId);
          statement.setLong(++paramIndex, albumId);

      // Commit the current transaction.
      System.out.println("Reduced marketing budget");


import (


func Tags(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	tx, err := conn.Begin(ctx)
	if err != nil {
		return err

	// Set the TRANSACTION_TAG session variable to set a transaction tag
	// for the current transaction.
	_, _ = tx.Exec(ctx, "set spanner.transaction_tag='example-tx-tag'")

	// Set the STATEMENT_TAG session variable to set the request tag
	// that should be included with the next SQL statement.
	_, _ = tx.Exec(ctx, "set spanner.statement_tag='query-marketing-budget'")

	row := tx.QueryRow(ctx, "select marketing_budget "+
		"from albums "+
		"where singer_id=$1 and album_id=$2", 1, 1)
	var budget int64
	if err := row.Scan(&budget); err != nil {
		return err

	// Reduce the marketing budget by 10% if it is more than 1,000.
	if budget > 1000 {
		budget = int64(float64(budget) - float64(budget)*0.1)
		_, _ = tx.Exec(ctx, "set spanner.statement_tag='reduce-marketing-budget'")
		if _, err := tx.Exec(ctx, "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3", budget, 1, 1); err != nil {
			return err
	// Commit the current transaction.
	fmt.Println("Reduced marketing budget")

	return nil


import { Client } from 'pg';

async function tags(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  await connection.query("begin");
  // Set the TRANSACTION_TAG session variable to set a transaction tag
  // for the current transaction.
  await connection.query("set spanner.transaction_tag='example-tx-tag'");
  // Set the STATEMENT_TAG session variable to set the request tag
  // that should be included with the next SQL statement.
  await connection.query("set spanner.statement_tag='query-marketing-budget'");
  const budgetResult = await connection.query(
      "select marketing_budget " +
      "from albums " +
      "where singer_id=$1 and album_id=$2", [1, 1])
  let budget = budgetResult.rows[0]["marketing_budget"];
  // Reduce the marketing budget by 10% if it is more than 1,000.
  if (budget > 1000) {
    budget = budget - budget * 0.1;
    await connection.query("set spanner.statement_tag='reduce-marketing-budget'");
    await connection.query("update albums set marketing_budget=$1 "
        + "where singer_id=$2 AND album_id=$3", [budget, 1, 1]);
  // Commit the current transaction.
  await connection.query("commit");
  console.log("Reduced marketing budget");

  // Close the connection.
  await connection.end();


import string
import psycopg

def tags(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        # Set autocommit=False to enable transactions.
        conn.autocommit = False
        with conn.cursor() as cur:
            # Set the TRANSACTION_TAG session variable to set a transaction tag
            # for the current transaction.
            cur.execute("set spanner.transaction_TAG='example-tx-tag'")

            # Set the STATEMENT_TAG session variable to set the request tag
            # that should be included with the next SQL statement.
            cur.execute("set spanner.statement_tag='query-marketing-budget'")

            singer_id = 1
            album_id = 1
            cur.execute("select marketing_budget "
                        "from albums "
                        "where singer_id = %s "
                        "  and album_id  = %s",
                        (singer_id, album_id,))
            marketing_budget = cur.fetchone()[0]

            # Reduce the marketing budget by 10% if it is more than 1,000.
            max_marketing_budget = 1000
            reduction = 0.1
            if marketing_budget > max_marketing_budget:
                # Make sure the marketing_budget remains an int.
                marketing_budget -= int(marketing_budget * reduction)
                # Set a statement tag for the update statement.
                    "set spanner.statement_tag='reduce-marketing-budget'")
                cur.execute("update albums set marketing_budget = %s "
                            "where singer_id = %s "
                            "  and album_id  = %s",
                            (marketing_budget, singer_id, album_id,))
                print("Marketing budget already less than or equal to 1,000")
        # Commit the transaction.
        print("Reduced marketing budget")


using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class TagsSample
    public static void Tags(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Start a transaction with isolation level Serializable.
        // Spanner only supports this isolation level. Trying to use a lower
        // isolation level (including the default isolation level READ COMMITTED),
        // will result in an error.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);

        // Create a command that uses the current transaction.
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;

        // Set the TRANSACTION_TAG session variable to set a transaction tag
        // for the current transaction.
        cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";

        // Set the STATEMENT_TAG session variable to set the request tag
        // that should be included with the next SQL statement.
        cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";

        // Get the marketing_budget of Album (1,1).
        cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        var marketingBudget = (long?)cmd.ExecuteScalar();

        // Reduce the marketing budget by 10% if it is more than 1,000.
        if (marketingBudget > 1000L)
            marketingBudget -= (long) (marketingBudget * 0.1);

            // Set the statement tag to use for the update statement.
            cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";

            cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
            cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });

        // Commit the current transaction.
        Console.WriteLine("Reduced marketing budget");


function tags(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Start a read/write transaction.

    // Set the TRANSACTION_TAG session variable to set a transaction tag
    // for the current transaction.
    $connection->exec("set spanner.transaction_TAG='example-tx-tag'");

    // Set the STATEMENT_TAG session variable to set the request tag
    // that should be included with the next SQL statement.
    $connection->exec("set spanner.statement_tag='query-marketing-budget'");

    $singer_id = 1;
    $album_id = 1;
    $statement = $connection->prepare(
        "select marketing_budget "
        ."from albums "
        ."where singer_id = ? "
        ."  and album_id  = ?"
    $statement->execute([1, 1]);
    $marketing_budget = $statement->fetchAll()[0][0];

    # Reduce the marketing budget by 10% if it is more than 1,000.
    $max_marketing_budget = 1000;
    $reduction = 0.1;
    if ($marketing_budget > $max_marketing_budget) {
        // Make sure the marketing_budget remains an int.
        $marketing_budget -= intval($marketing_budget * $reduction);
        // Set a statement tag for the update statement.
        $connection->exec("set spanner.statement_tag='reduce-marketing-budget'");
        $update_statement = $connection->prepare(
            "update albums set marketing_budget = :budget "
            ."where singer_id = :singer_id "
            ."  and album_id  = :album_id"
            "budget" => $marketing_budget,
            "singer_id" => $singer_id,
            "album_id" => $album_id,
    } else {
        print("Marketing budget already less than or equal to 1,000\n");
    // Commit the transaction.
    print("Reduced marketing budget\n");

    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar tags example-db


go run sample_runner.go tags example-db


npm start tags example-db


python example-db


dotnet run tags example-db


php tags.php example-db

Recuperare i dati utilizzando le transazioni di sola lettura

Supponiamo che tu voglia eseguire più di una lettura con lo stesso timestamp. Le transazioni di sola lettura osservano un prefisso coerente della cronologia dei commit delle transazioni, in modo che l'applicazione riceva sempre dati coerenti. Imposta la connessione come di sola lettura o utilizza l'istruzione SQL SET TRANSACTION READ ONLY per eseguire una transazione di sola lettura.

Di seguito viene mostrato come eseguire una query ed eseguire una lettura nella stessa transazione di sola lettura:



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Begin a transaction.
  -- Change the current transaction to a read-only transaction.
  -- This statement can only be executed at the start of a transaction.
  set transaction read only;

  -- The following two queries use the same read-only transaction.
  select singer_id, album_id, album_title
  from albums
  order by singer_id, album_id;

  select singer_id, album_id, album_title
  from albums
  order by album_title;

  -- Read-only transactions must also be committed or rolled back to mark
  -- the end of the transaction. There is no semantic difference between
  -- rolling back or committing a read-only transaction.


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class ReadOnlyTransaction {
  static void readOnlyTransaction(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      // This SQL statement instructs the JDBC driver to use
      // a read-only transaction.
      connection.createStatement().execute("set transaction read only");

      try (ResultSet resultSet =
                  "SELECT singer_id, album_id, album_title "
                      + "FROM albums "
                      + "ORDER BY singer_id, album_id")) {
        while ( {
              "%d %d %s\n",
      try (ResultSet resultSet =
                  "SELECT singer_id, album_id, album_title "
                      + "FROM albums "
                      + "ORDER BY album_title")) {
        while ( {
              "%d %d %s\n",
      // End the read-only transaction by calling commit().


import (


func ReadOnlyTransaction(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Start a read-only transaction by supplying additional transaction options.
	tx, err := conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})

	albumsOrderedById, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY singer_id, album_id")
	defer albumsOrderedById.Close()
	if err != nil {
		return err
	for albumsOrderedById.Next() {
		var singerId, albumId int64
		var title string
		err = albumsOrderedById.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		fmt.Printf("%v %v %v\n", singerId, albumId, title)

	albumsOrderedTitle, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY album_title")
	defer albumsOrderedTitle.Close()
	if err != nil {
		return err
	for albumsOrderedTitle.Next() {
		var singerId, albumId int64
		var title string
		err = albumsOrderedTitle.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		fmt.Printf("%v %v %v\n", singerId, albumId, title)

	// End the read-only transaction by calling Commit().
	return tx.Commit(ctx)


import { Client } from 'pg';

async function readOnlyTransaction(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Start a transaction.
  await connection.query("begin");
  // This SQL statement instructs the PGAdapter to make it a read-only transaction.
  await connection.query("set transaction read only");

  const albumsOrderById = await connection.query(
      "SELECT singer_id, album_id, album_title "
      + "FROM albums "
      + "ORDER BY singer_id, album_id");
  for (const row of albumsOrderById.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  const albumsOrderByTitle = await connection.query(
      "SELECT singer_id, album_id, album_title "
      + "FROM albums "
      + "ORDER BY album_title");
  for (const row of albumsOrderByTitle.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  // End the read-only transaction by executing commit.
  await connection.query("commit");

  // Close the connection.
  await connection.end();


import string
import psycopg

def read_only_transaction(host: string, port: int, database: string):
    with (psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn):
        # Set autocommit=False to enable transactions.
        conn.autocommit = False

        with conn.cursor() as cur:
            # Change the current transaction to a read-only transaction.
            # This statement can only be executed at the start of a transaction.
            cur.execute("set transaction read only")

            # The following two queries use the same read-only transaction.
            cur.execute("select singer_id, album_id, album_title "
                        "from albums "
                        "order by singer_id, album_id")
            for album in cur:

            cur.execute("select singer_id, album_id, album_title "
                        "from albums "
                        "order by album_title")
            for album in cur:

        # Read-only transactions must also be committed or rolled back to mark
        # the end of the transaction. There is no semantic difference between
        # rolling back or committing a read-only transaction.


using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class ReadOnlyTransactionSample
    public static void ReadOnlyTransaction(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Start a read-only transaction.
        // You must specify Serializable as the isolation level, as the npgsql driver
        // will otherwise automatically set the isolation level to read-committed.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;
        // This SQL statement instructs the npgsql driver to use
        // a read-only transaction.
        cmd.CommandText = "set transaction read only";

        cmd.CommandText = "SELECT singer_id, album_id, album_title " +
                          "FROM albums " +
                          "ORDER BY singer_id, album_id";
        using (var reader = cmd.ExecuteReader())
            while (reader.Read())
                Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
        cmd.CommandText = "SELECT singer_id, album_id, album_title "
                          + "FROM albums "
                          + "ORDER BY album_title";
        using (var reader = cmd.ExecuteReader())
            while (reader.Read())
                Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
        // End the read-only transaction by calling commit().


function read_only_transaction(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Start a transaction.
    // Change the current transaction to a read-only transaction.
    // This statement can only be executed at the start of a transaction.
    $connection->exec("set transaction read only");

    // The following two queries use the same read-only transaction.
    $statement = $connection->query(
        "select singer_id, album_id, album_title "
        ."from albums "
        ."order by singer_id, album_id"
    $rows = $statement->fetchAll();
    foreach ($rows as $album)
        printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);

    $statement = $connection->query(
        "select singer_id, album_id, album_title "
        ."from albums "
        ."order by album_title"
    $rows = $statement->fetchAll();
    foreach ($rows as $album)
        printf("%s\t%s\t%s\n", $album["singer_id"], $album["album_id"], $album["album_title"]);

    # Read-only transactions must also be committed or rolled back to mark
    # the end of the transaction. There is no semantic difference between
    # rolling back or committing a read-only transaction.

    $rows = null;
    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar readonlytransaction example-db


go run sample_runner.go readonlytransaction example-db


npm start readonlytransaction example-db


python example-db


dotnet run readonlytransaction example-db


php read_only_transaction.php example-db

Dovresti visualizzare un output simile al seguente:

    1 1 Total Junk
    1 2 Go, Go, Go
    2 1 Green
    2 2 Forever Hold Your Peace
    2 3 Terrified
    2 2 Forever Hold Your Peace
    1 2 Go, Go, Go
    2 1 Green
    2 3 Terrified
    1 1 Total Junk

Query partizionate e Data Boost

L'API partitionQuery divide una query in parti più piccole, o partizioni, e utilizza più macchine per recuperare le partizioni in parallelo. Ogni partizione è identificata da un token di partizione. L'API PartitionQuery ha una latenza più elevata rispetto all'API query standard, in quanto è destinata solo a operazioni collettive come l'esportazione o la scansione dell'intero database.

Data Boost consente di eseguire query di analisi ed esportazioni di dati con un impatto quasi nullo sui carichi di lavoro esistenti nell'istanza Spanner di cui è stato eseguito il provisioning. Data Boost supporta solo le query partizionate.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# 'set spanner.data_boost_enabled=true' enables Data Boost for
# all partitioned queries on this connection.

# 'run partitioned query' is a shortcut for partitioning the query
# that follows and executing each of the partitions that is returned
# by Spanner.

psql -c "set spanner.data_boost_enabled=true" \
     -c "run partitioned query
         select singer_id, first_name, last_name
         from singers"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class DataBoost {
  static void dataBoost(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // This enables Data Boost for all partitioned queries on this connection.
      connection.createStatement().execute("set spanner.data_boost_enabled=true");

      // Run a partitioned query. This query will use Data Boost.
      try (ResultSet resultSet =
                  "run partitioned query "
                      + "select singer_id, first_name, last_name "
                      + "from singers")) {
        while ( {
              "%d %s %s\n",


import (


func DataBoost(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// This enables Data Boost for all partitioned queries on this connection.
	_, _ = conn.Exec(ctx, "set spanner.data_boost_enabled=true")

	// Run a partitioned query. This query will use Data Boost.
	rows, err := conn.Query(ctx, "run partitioned query select singer_id, first_name, last_name from singers")
	defer rows.Close()
	if err != nil {
		return err
	for rows.Next() {
		var singerId int64
		var firstName, lastName string
		err = rows.Scan(&singerId, &firstName, &lastName)
		if err != nil {
			return err
		fmt.Printf("%v %v %v\n", singerId, firstName, lastName)

	return nil


import { Client } from 'pg';

async function dataBoost(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // This enables Data Boost for all partitioned queries on this connection.
  await connection.query("set spanner.data_boost_enabled=true");

  // Run a partitioned query. This query will use Data Boost.
  const singers = await connection.query(
      "run partitioned query "
      + "select singer_id, first_name, last_name "
      + "from singers");
  for (const row of singers.rows) {
    console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def data_boost(host: string, port: int, database: string):
    with (psycopg.connect("host={host} port={port} dbname={database} "
                                                   database=database)) as conn):
        # Set autocommit=True so each query uses a separate transaction.
        conn.autocommit = True

        with conn.cursor() as cur:
            # This enables Data Boost for all partitioned queries on this
            # connection.
            cur.execute("set spanner.data_boost_enabled=true")

            # Run a partitioned query. This query will use Data Boost.
            cur.execute("run partitioned query "
                        "select singer_id, first_name, last_name "
                        "from singers")
            for singer in cur:


using Npgsql;

namespace dotnet_snippets;

public static class DataBoostSample
    public static void DataBoost(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        using var cmd = connection.CreateCommand();
        // This enables Data Boost for all partitioned queries on this connection.
        cmd.CommandText = "set spanner.data_boost_enabled=true";

        // Run a partitioned query. This query will use Data Boost.
        cmd.CommandText = "run partitioned query "
                          + "select singer_id, first_name, last_name "
                          + "from singers";
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
            Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");


function data_boost(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // This enables Data Boost for all partitioned queries on this
    // connection.
    $connection->exec("set spanner.data_boost_enabled=true");

    // Run a partitioned query. This query will use Data Boost.
    $statement = $connection->query(
        "run partitioned query "
        ."select singer_id, first_name, last_name "
        ."from singers"
    $rows = $statement->fetchAll();
    foreach ($rows as $singer) {
        printf("%s\t%s\t%s\n", $singer["singer_id"], $singer["first_name"], $singer["last_name"]);

    $rows = null;
    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar databoost example-db


go run sample_runner.go databoost example-db


npm start databoost example-db


python example-db


dotnet run databoost example-db


php data_boost.php example-db

Per ulteriori informazioni sull'esecuzione di query partizionate e sull'utilizzo di Data Boost con PGAdapter, consulta: Data Boost e istruzioni di query partizionate

DML partizionato

Il Data Manipulation Language (DML) partizionato è progettato per i seguenti tipi di aggiornamenti ed eliminazioni collettivi:

  • Pulizia e garbage collection periodici.
  • Eseguire il backfill delle nuove colonne con i valori predefiniti.



export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See for more
# information.
psql -c "set spanner.autocommit_dml_mode='partitioned_non_atomic'" \
     -c "update albums
         set marketing_budget=0
         where marketing_budget is null"

echo "Updated albums using Partitioned DML"


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

class PartitionedDml {

  static void partitionedDml(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Enable Partitioned DML on this connection.
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
      // Back-fill a default value for the MarketingBudget column.
      long lowerBoundUpdateCount =
              .executeUpdate("update albums set marketing_budget=0 where marketing_budget is null");
      System.out.printf("Updated at least %d albums\n", lowerBoundUpdateCount);


import (


func PartitionedDML(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	defer conn.Close(ctx)

	// Enable Partitioned DML on this connection.
	if _, err := conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
		return err
	// Back-fill a default value for the MarketingBudget column.
	tag, err := conn.Exec(ctx, "update albums set marketing_budget=0 where marketing_budget is null")
	if err != nil {
		return err
	fmt.Printf("Updated at least %v albums\n", tag.RowsAffected())

	return nil


import { Client } from 'pg';

async function partitionedDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  await connection.connect();

  // Enable Partitioned DML on this connection.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

  // Back-fill a default value for the MarketingBudget column.
  const lowerBoundUpdateCount = await connection.query(
      "update albums " +
      "set marketing_budget=0 " +
      "where marketing_budget is null");
  console.log(`Updated at least ${lowerBoundUpdateCount.rowCount} albums`);

  // Close the connection.
  await connection.end();


import string
import psycopg

def execute_partitioned_dml(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            # Change the DML mode that is used by this connection to Partitioned
            # DML. Partitioned DML is designed for bulk updates and deletes.
            # See for more
            # information.
                "set spanner.autocommit_dml_mode='partitioned_non_atomic'")

            # The following statement will use Partitioned DML.
            cur.execute("update albums "
                        "set marketing_budget=0 "
                        "where marketing_budget is null")
            print("Updated at least %d albums" % cur.rowcount)


using Npgsql;

namespace dotnet_snippets;

public static class PartitionedDmlSample
    public static void PartitionedDml(string host, int port, string database)
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);

        // Enable Partitioned DML on this connection.
        using var cmd = connection.CreateCommand();
        cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";

        // Back-fill a default value for the MarketingBudget column.
        cmd.CommandText = "update albums set marketing_budget=0 where marketing_budget is null";
        var lowerBoundUpdateCount = cmd.ExecuteNonQuery();

        Console.WriteLine($"Updated at least {lowerBoundUpdateCount} albums");


function execute_partitioned_dml(string $host, string $port, string $database): void
    $dsn = sprintf("pgsql:host=%s;port=%s;dbname=%s", $host, $port, $database);
    $connection = new PDO($dsn);

    // Change the DML mode that is used by this connection to Partitioned
    // DML. Partitioned DML is designed for bulk updates and deletes.
    // See for more
    // information.
    $connection->exec("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

    // The following statement will use Partitioned DML.
    $rowcount = $connection->exec(
        "update albums "
        ."set marketing_budget=0 "
        ."where marketing_budget is null"
    printf("Updated at least %d albums\n", $rowcount);

    $statement = null;
    $connection = null;

Esegui l'esempio con il seguente comando:


PGDATABASE=example-db ./


java -jar target/pgadapter-snippets/pgadapter-samples.jar partitioneddml example-db


go run sample_runner.go partitioneddml example-db


npm start partitioneddml example-db


python example-db


dotnet run datpartitioneddmlboost example-db


php partitioned_dml.php example-db

Esegui la pulizia

Per evitare che al tuo account Fatturazione Google vengano addebitati costi aggiuntivi per le risorse utilizzate in questo tutorial, elimina il database ed elimina l'istanza che hai creato.

Elimina il database

Se elimini un'istanza, tutti i database al suo interno vengono eliminati automaticamente. Questo passaggio mostra come eliminare un database senza eliminare un'istanza (per l'istanza verranno comunque addebitati costi).

Nella riga di comando

gcloud spanner databases delete example-db --instance=test-instance

Utilizzo della console Google Cloud

  1. Vai alla pagina Istanze Spanner nella console Google Cloud.

    Vai alla pagina Istanze

  2. Fai clic sull'istanza.

  3. Fai clic sul database che vuoi eliminare.

  4. Nella pagina Dettagli database, fai clic su Elimina.

  5. Conferma di voler eliminare il database e fai clic su Elimina.

Elimina l'istanza

L'eliminazione di un'istanza comporta automaticamente l'eliminazione di tutti i database creati in quell'istanza.

Nella riga di comando

gcloud spanner instances delete test-instance

Utilizzo della console Google Cloud

  1. Vai alla pagina Istanze Spanner nella console Google Cloud.

    Vai alla pagina Istanze

  2. Fai clic sull'istanza.

  3. Fai clic su Elimina.

  4. Conferma di voler eliminare l'istanza e fai clic su Elimina.

Passaggi successivi