Getting started with Spanner and PGAdapter


Objectives

This tutorial walks you through the following steps using the Spanner PGAdapter local proxy for PostgreSQL drivers:

  • Create a Spanner instance and database.
  • Write, read, and execute SQL queries on data in the database.
  • Update the database schema.
  • Update data using a read-write transaction.
  • Add a secondary index to the database.
  • Use the index to read and execute SQL queries on data.
  • Retrieve data using a read-only transaction.

Costs

This tutorial uses Spanner, which is a billable component of the Google Cloud. For information on the cost of using Spanner, see Pricing.

Before you begin

Complete the steps described in Set up, which cover creating and setting a default Google Cloud project, enabling billing, enabling the Cloud Spanner API, and setting up OAuth 2.0 to get authentication credentials to use the Cloud Spanner API.

In particular, make sure that you run gcloud auth application-default login to set up your local development environment with authentication credentials.

Prepare your local PGAdapter environment

You can use PostgreSQL drivers in combination with PGAdapter to connect to Spanner. PGAdapter is a local proxy that translates the PostgreSQL network protocol to the Spanner gRPC protocol.

PGAdapter requires either Java or Docker to run.

  1. Install one of the following on your development machine if none of them are already installed:

  2. Clone the sample app repository to your local machine:

    git clone https://github.com/GoogleCloudPlatform/pgadapter.git
    
  3. Change to the directory that contains the Spanner sample code:

    psql

    cd pgadapter/samples/snippets/psql-snippets
    

    Java

    cd pgadapter/samples/snippets/java-snippets
    mvn package -DskipTests
    

    Go

    cd pgadapter/samples/snippets/golang-snippets
    

    Node.js

    cd pgadapter/samples/snippets/nodejs-snippets
    npm install
    

    Python

    cd pgadapter/samples/snippets/python-snippets
    python -m venv ./venv
    pip install -r requirements.txt
    cd samples
    

    C#

    cd pgadapter/samples/snippets/dotnet-snippets
    

Create an instance

When you first use Spanner, you must create an instance, which is an allocation of resources that are used by Spanner databases. When you create an instance, you choose an instance configuration, which determines where your data is stored, and also the number of nodes to use, which determines the amount of serving and storage resources in your instance.

Execute the following command to create a Spanner instance in the region us-central1 with 1 node:

gcloud spanner instances create test-instance --config=regional-us-central1 \
    --description="Test Instance" --nodes=1

Note that this creates an instance with the following characteristics:

  • Instance ID test-instance
  • Display name Test Instance
  • Instance configuration regional-us-central1 (Regional configurations store data in one region, while multi-region configurations distribute data across multiple regions. For more information, see About instances.)
  • Node count of 1 (node_count corresponds to the amount of serving and storage resources available to databases in the instance. Learn more in Nodes and processing units.)

You should see:

Creating instance...done.

Look through sample files

The samples repository contains a sample that shows how to use Spanner with PGAdapter.

Take a look through the samples/snippets folder, which shows how to use Spanner. The code shows how to create and use a new database. The data uses the example schema shown in the Schema and data model page.

Start PGAdapter

Start PGAdapter on your local development machine and point it to the instance that you created.

The following commands assume that you have executed gcloud auth application-default login.

Java Application

wget https://storage.googleapis.com/pgadapter-jar-releases/pgadapter.tar.gz \
    && tar -xzvf pgadapter.tar.gz
java -jar pgadapter.jar -i test-instance

Docker

docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter
docker run \
    --name pgadapter \
    --rm -d -p 5432:5432 \
    -v "$HOME/.config/gcloud":/gcloud:ro \
    --env CLOUDSDK_CONFIG=/gcloud \
    gcr.io/cloud-spanner-pg-adapter/pgadapter \
    -i test-instance -x

Emulator

docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
docker run \
    --name pgadapter-emulator \
    --rm -d \
    -p 5432:5432 \
    -p 9010:9010 \
    -p 9020:9020 \
    gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator

This starts PGAdapter with an embedded Spanner emulator. This embedded emulator automatically creates any Spanner instance or database that you connect to without the need to manually create them beforehand.

We recommend that you run PGAdapter in production as either a side-car container or as an in-process dependency. For more information on deploying PGAdapter in production, see Choose a method for running PGAdapter.

Create a database

Create a database called example-db in the instance called test-instance by running the following on the command line.

gcloud spanner databases create example-db --instance=test-instance \
    --database-dialect=POSTGRESQL

You should see:

Creating database...done.

Create tables

The following code creates two tables in the database.

psql

#!/bin/bash

# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create two tables in one batch.
psql << SQL
-- Create the singers table
CREATE TABLE singers (
  singer_id   bigint not null primary key,
  first_name  character varying(1024),
  last_name   character varying(1024),
  singer_info bytea,
  full_name   character varying(2048) GENERATED ALWAYS
          AS (first_name || ' ' || last_name) STORED
);

-- Create the albums table. This table is interleaved in the parent table
-- "singers".
CREATE TABLE albums (
  singer_id     bigint not null,
  album_id      bigint not null,
  album_title   character varying(1024),
  primary key (singer_id, album_id)
)
-- The 'interleave in parent' clause is a Spanner-specific extension to
-- open-source PostgreSQL.
INTERLEAVE IN PARENT singers ON DELETE CASCADE;
SQL

echo "Created Singers & Albums tables in database: [${PGDATABASE}]"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class CreateTables {
  static void createTables(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two tables in one batch.
        statement.addBatch(
            "create table singers ("
                + "  singer_id   bigint primary key not null,"
                + "  first_name  varchar(1024),"
                + "  last_name   varchar(1024),"
                + "  singer_info bytea,"
                + "  full_name   varchar(2048) generated always as (\n"
                + "      case when first_name is null then last_name\n"
                + "          when last_name  is null then first_name\n"
                + "          else first_name || ' ' || last_name\n"
                + "      end) stored"
                + ")");
        statement.addBatch(
            "create table albums ("
                + "  singer_id     bigint not null,"
                + "  album_id      bigint not null,"
                + "  album_title   varchar,"
                + "  primary key (singer_id, album_id)"
                + ") interleave in parent singers on delete cascade");
        statement.executeBatch();
        System.out.println("Created Singers & Albums tables in database: [" + database + "]");
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func CreateTables(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Create two tables in one batch on Spanner.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "create table singers (" +
			"  singer_id   bigint primary key not null," +
			"  first_name  character varying(1024)," +
			"  last_name   character varying(1024)," +
			"  singer_info bytea," +
			"  full_name   character varying(2048) generated " +
			"  always as (first_name || ' ' || last_name) stored" +
			")"},
		{SQL: "create table albums (" +
			"  singer_id     bigint not null," +
			"  album_id      bigint not null," +
			"  album_title   character varying(1024)," +
			"  primary key (singer_id, album_id)" +
			") interleave in parent singers on delete cascade"},
	}})
	cmd, err := br.Exec()
	if err != nil {
		return err
	}
	if cmd.String() != "CREATE" {
		return fmt.Errorf("unexpected command tag: %v", cmd.String())
	}
	if err := br.Close(); err != nil {
		return err
	}
	fmt.Printf("Created Singers & Albums tables in database: [%s]\n", database)

	return nil
}

Node.js

import { Client } from 'pg';

async function createTables(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Create two tables in one batch.
  await connection.query("start batch ddl");
  await connection.query("create table singers (" +
      "  singer_id   bigint primary key not null," +
      "  first_name  character varying(1024)," +
      "  last_name   character varying(1024)," +
      "  singer_info bytea," +
      "  full_name   character varying(2048) generated " +
      "  always as (first_name || ' ' || last_name) stored" +
      ")");
  await connection.query("create table albums (" +
      "  singer_id     bigint not null," +
      "  album_id      bigint not null," +
      "  album_title   character varying(1024)," +
      "  primary key (singer_id, album_id)" +
      ") interleave in parent singers on delete cascade");
  await connection.query("run batch");
  console.log(`Created Singers & Albums tables in database: [${database}]`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def create_tables(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    with psycopg.connect("host={host} port={port} "
                         "dbname={database} "
                         "sslmode=disable".format(host=host, port=port,
                                                  database=database)) as conn:
        # Enable autocommit to execute DDL statements, as psycopg otherwise
        # tries to use a read/write transaction.
        conn.autocommit = True

        # Use a pipeline to execute multiple DDL statements in one batch.
        with conn.pipeline():
            conn.execute("create table singers ("
                         + "  singer_id   bigint primary key not null,"
                         + "  first_name  character varying(1024),"
                         + "  last_name   character varying(1024),"
                         + "  singer_info bytea,"
                         + "  full_name   character varying(2048) generated "
                         + "  always as (first_name || ' ' || last_name) stored"
                         + ")")
            conn.execute("create table albums ("
                         + "  singer_id     bigint not null,"
                         + "  album_id      bigint not null,"
                         + "  album_title   character varying(1024),"
                         + "  primary key (singer_id, album_id)"
                         + ") interleave in parent singers on delete cascade")
        print("Created Singers & Albums tables in database: [{database}]"
              .format(database=database))

C#

using Npgsql;

namespace dotnet_snippets;

public static class CreateTablesSample
{
    public static void CreateTables(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Create two tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table singers ("
            + "  singer_id   bigint primary key not null,"
            + "  first_name  varchar(1024),"
            + "  last_name   varchar(1024),"
            + "  singer_info bytea,"
            + "  full_name   varchar(2048) generated always as (\n"
            + "      case when first_name is null then last_name\n"
            + "          when last_name  is null then first_name\n"
            + "          else first_name || ' ' || last_name\n"
            + "      end) stored"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table albums ("
            + "  singer_id     bigint not null,"
            + "  album_id      bigint not null,"
            + "  album_title   varchar,"
            + "  primary key (singer_id, album_id)"
            + ") interleave in parent singers on delete cascade"));
        batch.ExecuteNonQuery();
        Console.WriteLine($"Created Singers & Albums tables in database: [{database}]");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./create_tables.sh example-db

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar createtables example-db

Go

go run sample_runner.go createtables example-db

Node.js

npm start createtables example-db

Python

python create_tables.py example-db

C#

dotnet run createtables example-db

The next step is to write data to your database.

Create a connection

Before you can do reads or writes, you must create a connection to PGAdapter. All of your interactions with Spanner must go through a Connection. The database name is specified in the connection string.

psql

#!/bin/bash

# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Connect to Cloud Spanner using psql through PGAdapter
# and execute a simple query.
psql -c "select 'Hello world!' as hello"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class CreateConnection {
  static void createConnection(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection.createStatement().executeQuery("select 'Hello world!' as hello")) {
        while (resultSet.next()) {
          System.out.printf("Greeting from Cloud Spanner PostgreSQL: %s\n", resultSet.getString(1));
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func CreateConnection(host string, port int, database string) error {
	ctx := context.Background()
	// Connect to Cloud Spanner using pgx through PGAdapter.
	// 'sslmode=disable' is optional, but adding it reduces the connection time,
	// as pgx will then skip first trying to create an SSL connection.
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	row := conn.QueryRow(ctx, "select 'Hello world!' as hello")
	var msg string
	if err := row.Scan(&msg); err != nil {
		return err
	}
	fmt.Printf("Greeting from Cloud Spanner PostgreSQL: %s\n", msg)

	return nil
}

Node.js

import { Client } from 'pg';

async function createConnection(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("select 'Hello world!' as hello");
  console.log(`Greeting from Cloud Spanner PostgreSQL: ${result.rows[0]['hello']}`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def create_connection(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    # 'sslmode=disable' is optional, but adding it reduces the connection time,
    # as psycopg3 will then skip first trying to create an SSL connection.
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("select 'Hello world!' as hello")
            print("Greeting from Cloud Spanner PostgreSQL:", cur.fetchone()[0])

C#

using Npgsql;

namespace dotnet_snippets;

public static class CreateConnectionSample
{
    public static void CreateConnection(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("select 'Hello World!' as hello", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            var greeting = reader.GetString(0);
            Console.WriteLine($"Greeting from Cloud Spanner PostgreSQL: {greeting}");
        }
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./create_connection.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar createconnection example-db

Go

go run sample_runner.go createconnection example-db

Node.js

npm start createconnection example-db

Python

python create_connection.py example-db

C#

dotnet run createconnection example-db

Write data with DML

You can insert data using Data Manipulation Language (DML) in a read-write transaction.

These samples show how to execute a DML statement on Spanner using a PostgreSQL driver.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "INSERT INTO singers (singer_id, first_name, last_name) VALUES
                             (12, 'Melissa', 'Garcia'),
                             (13, 'Russel', 'Morales'),
                             (14, 'Jacqueline', 'Long'),
                             (15, 'Dylan', 'Shaw')"

echo "4 records inserted"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDml {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;
    }
  }

  static void writeDataWithDml(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add 4 rows in one statement.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
          connection.prepareStatement(
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?)")) {

        final List<Singer> singers =
            Arrays.asList(
                new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
                new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
                new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
                new Singer(/* SingerId = */ 15L, "Dylan", "Shaw"));

        // Note that JDBC parameters start at index 1.
        int paramIndex = 0;
        for (Singer singer : singers) {
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);
        }

        int updateCount = preparedStatement.executeUpdate();
        System.out.printf("%d records inserted.\n", updateCount);
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	tag, err := conn.Exec(ctx,
		"INSERT INTO singers (singer_id, first_name, last_name) "+
			"VALUES ($1, $2, $3), ($4, $5, $6), "+
			"       ($7, $8, $9), ($10, $11, $12)",
		12, "Melissa", "Garcia",
		13, "Russel", "Morales",
		14, "Jacqueline", "Long",
		15, "Dylan", "Shaw")
	if err != nil {
		return err
	}
	fmt.Printf("%v records inserted\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';

async function writeDataWithDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("INSERT INTO singers (singer_id, first_name, last_name) " +
      "VALUES ($1, $2, $3), ($4, $5, $6), " +
      "       ($7, $8, $9), ($10, $11, $12)",
       [12, "Melissa", "Garcia",
        13, "Russel", "Morales",
        14, "Jacqueline", "Long",
        15, "Dylan", "Shaw"])
  console.log(`${result.rowCount} records inserted`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def write_data_with_dml(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("INSERT INTO singers (singer_id, first_name, last_name)"
                        " VALUES (%s, %s, %s), (%s, %s, %s), "
                        "        (%s, %s, %s), (%s, %s, %s)",
                        (12, "Melissa", "Garcia",
                         13, "Russel", "Morales",
                         14, "Jacqueline", "Long",
                         15, "Dylan", "Shaw",))
            print("%d records inserted" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlSample
{
    readonly struct Singer
    {
        public Singer(long singerId, string firstName, string lastName)
        {
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;
        }

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }
    }

    public static void WriteDataWithDml(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();
        // Add 4 rows in one statement.
        using var cmd = new NpgsqlCommand("INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                                          + "($1, $2, $3), "
                                          + "($4, $5, $6), "
                                          + "($7, $8, $9), "
                                          + "($10, $11, $12)", connection);
        List<Singer> singers =
        [
            new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
            new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
            new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
            new Singer(/* SingerId = */ 15L, "Dylan", "Shaw")
        ];
        foreach (var singer in singers)
        {
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.SingerId });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.FirstName });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.LastName });
        }
        var updateCount = cmd.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./write_data_with_dml.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdml example-db

Go

go run sample_runner.go writeusingdml example-db

Node.js

npm start writeusingdml example-db

Python

python write_data_with_dml.py example-db

C#

dotnet run writeusingdml example-db

You should see the following response:

 4 records inserted.

Write data with a DML batch

PGAdapter supports executing DML batches. Sending multiple DML statements in one batch reduces the number of round-trips to Spanner and improves the performance of your application.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared insert statement and execute this prepared
# insert statement three times in one SQL string. The single
# SQL string with three insert statements will be executed as
# a single DML batch on Spanner.
psql -c "PREPARE insert_singer AS
           INSERT INTO singers (singer_id, first_name, last_name)
           VALUES (\$1, \$2, \$3)" \
     -c "EXECUTE insert_singer (16, 'Sarah', 'Wilson');
         EXECUTE insert_singer (17, 'Ethan', 'Miller');
         EXECUTE insert_singer (18, 'Maya', 'Patel');"

echo "3 records inserted"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDmlBatch {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;
    }
  }

  static void writeDataWithDmlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add multiple rows in one DML batch.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
          connection.prepareStatement(
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)")) {
        final List<Singer> singers =
            Arrays.asList(
                new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
                new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
                new Singer(/* SingerId = */ 18L, "Maya", "Patel"));

        for (Singer singer : singers) {
          // Note that JDBC parameters start at index 1.
          int paramIndex = 0;
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);
          preparedStatement.addBatch();
        }

        int[] updateCounts = preparedStatement.executeBatch();
        System.out.printf("%d records inserted.\n", Arrays.stream(updateCounts).sum());
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithDmlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	sql := "INSERT INTO singers (singer_id, first_name, last_name) " +
		"VALUES ($1, $2, $3)"
	batch := &pgx.Batch{}
	batch.Queue(sql, 16, "Sarah", "Wilson")
	batch.Queue(sql, 17, "Ethan", "Miller")
	batch.Queue(sql, 18, "Maya", "Patel")
	br := conn.SendBatch(ctx, batch)
	_, err = br.Exec()
	if err := br.Close(); err != nil {
		return err
	}

	if err != nil {
		return err
	}
	fmt.Printf("%v records inserted\n", batch.Len())

	return nil
}

Node.js

import { Client } from 'pg';

async function writeDataWithDmlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // node-postgres does not support PostgreSQL pipeline mode, so we must use the
  // `start batch dml` / `run batch` statements to execute a DML batch.
  const sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
  await connection.query("start batch dml");
  await connection.query(sql, [16, "Sarah", "Wilson"]);
  await connection.query(sql, [17, "Ethan", "Miller"]);
  await connection.query(sql, [18, "Maya", "Patel"]);
  const result = await connection.query("run batch");
  // RUN BATCH returns the update counts as an array of strings, with one element for each
  // DML statement in the batch. This calculates the total number of affected rows from that array.
  const updateCount = result.rows[0]["UPDATE_COUNTS"]
      .map((s: string) => parseInt(s))
      .reduce((c: number, current: number) => c + current, 0);
  console.log(`${updateCount} records inserted`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def write_data_with_dml_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.executemany("INSERT INTO singers "
                            "(singer_id, first_name, last_name) "
                            "VALUES (%s, %s, %s)",
                            [(16, "Sarah", "Wilson",),
                             (17, "Ethan", "Miller",),
                             (18, "Maya", "Patel",), ])
            print("%d records inserted" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlBatchSample
{
    readonly struct Singer
    {
        public Singer(long singerId, string firstName, string lastName)
        {
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;
        }

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }
    }

    public static void WriteDataWithDmlBatch(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Add multiple rows in one DML batch.
        const string sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
        List<Singer> singers =
        [
            new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
            new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
            new Singer(/* SingerId = */ 18L, "Maya", "Patel")
        ];
        using var batch = new NpgsqlBatch(connection);
        foreach (var singer in singers)
        {
            batch.BatchCommands.Add(new NpgsqlBatchCommand
            {
                CommandText = sql,
                Parameters =
                {
                    new NpgsqlParameter {Value = singer.SingerId},
                    new NpgsqlParameter {Value = singer.FirstName},
                    new NpgsqlParameter {Value = singer.LastName}
                }
            });
        }
        var updateCount = batch.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./write_data_with_dml_batch.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdmlbatch example-db

Go

go run sample_runner.go writeusingdmlbatch example-db

Node.js

npm start writeusingdmlbatch example-db

Python

python write_data_with_dml_batch.py example-db

C#

dotnet run writeusingdmlbatch example-db

You should see:

3 records inserted.

Write data with mutations

You can also insert data using mutations.

PGAdapter translates the PostgreSQL COPY command to mutations. Using COPY is the most efficient way to quickly insert data in your Spanner database.

COPY operations are by default atomic. Atomic operations on Spanner are bound by the commit size limit. See CRUD limit for more information.

These examples show how to execute a non-atomic COPY operation. This allows the COPY operation to exceed the commit size limit.

psql

#!/bin/bash

# Get the source directory of this script.
directory=${BASH_SOURCE%/*}/

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Copy data to Spanner from a tab-separated text file using the COPY command.
psql -c "COPY singers (singer_id, first_name, last_name) FROM STDIN" \
  < "${directory}singers_data.txt"
psql -c "COPY albums FROM STDIN" \
  < "${directory}albums_data.txt"

echo "Copied singers and albums"

Java

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class WriteDataWithCopy {

  static void writeDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
      connection
          .createStatement()
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
      long numSingers =
          copyManager.copyIn(
              "COPY singers (singer_id, first_name, last_name) FROM STDIN",
              WriteDataWithCopy.class.getResourceAsStream("singers_data.txt"));
      System.out.printf("Copied %d singers\n", numSingers);

      long numAlbums =
          copyManager.copyIn(
              "COPY albums FROM STDIN",
              WriteDataWithCopy.class.getResourceAsStream("albums_data.txt"));
      System.out.printf("Copied %d albums\n", numAlbums);
    }
  }
}

Go

import (
	"context"
	"fmt"
	"os"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
	// will succeed even if it exceeds Spanner's mutation limit per transaction.
	conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'")

	file, err := os.Open("samples/singers_data.txt")
	if err != nil {
		return err
	}
	tag, err := conn.PgConn().CopyFrom(ctx, file,
		"copy singers (singer_id, first_name, last_name) from stdin")
	if err != nil {
		return err
	}
	fmt.Printf("Copied %v singers\n", tag.RowsAffected())

	file, err = os.Open("samples/albums_data.txt")
	if err != nil {
		return err
	}
	tag, err = conn.PgConn().CopyFrom(ctx, file,
		"copy albums from stdin")
	if err != nil {
		return err
	}
	fmt.Printf("Copied %v albums\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import { from as copyFrom } from 'pg-copy-streams'
import path from "path";

async function writeDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
  // Copy data from a csv file to Spanner using the COPY command.
  // Note that even though the command says 'from stdin', the actual input comes from a file.
  const copySingersStream = copyFrom('copy singers (singer_id, first_name, last_name) from stdin');
  const ingestSingersStream = connection.query(copySingersStream);
  const sourceSingersStream = fs.createReadStream(path.join(__dirname, 'singers_data.txt'));
  await pipeline(sourceSingersStream, ingestSingersStream);
  console.log(`Copied ${copySingersStream.rowCount} singers`);

  const copyAlbumsStream = copyFrom('copy albums from stdin');
  const ingestAlbumsStream = connection.query(copyAlbumsStream);
  const sourceAlbumsStream = fs.createReadStream(path.join(__dirname, 'albums_data.txt'));
  await pipeline(sourceAlbumsStream, ingestAlbumsStream);
  console.log(`Copied ${copyAlbumsStream.rowCount} albums`);

  // Close the connection.
  await connection.end();
}

Python

import os
import string
import psycopg


def write_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:

        script_dir = os.path.dirname(os.path.abspath(__file__))
        singers_file_path = os.path.join(script_dir, "singers_data.txt")
        albums_file_path = os.path.join(script_dir, "albums_data.txt")

        conn.autocommit = True
        block_size = 1024
        with conn.cursor() as cur:
            with open(singers_file_path, "r") as f:
                with cur.copy("COPY singers (singer_id, first_name, last_name) "
                              "FROM STDIN") as copy:
                    while data := f.read(block_size):
                        copy.write(data)
            print("Copied %d singers" % cur.rowcount)

            with open(albums_file_path, "r") as f:
                with cur.copy("COPY albums "
                              "FROM STDIN") as copy:
                    while data := f.read(block_size):
                        copy.write(data)
            print("Copied %d albums" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithCopySample
{
    public static void WriteDataWithCopy(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = new NpgsqlCommand("set spanner.autocommit_dml_mode='partitioned_non_atomic'", connection);
        cmd.ExecuteNonQuery();

        var singerCount = 0;
        using var singerReader = new StreamReader("singers_data.txt");
        using (var singerWriter = connection.BeginTextImport("COPY singers (singer_id, first_name, last_name) FROM STDIN"))
        {
            while (singerReader.ReadLine() is { } line)
            {
                singerWriter.WriteLine(line);
                singerCount++;
            }
        }
        Console.WriteLine($"Copied {singerCount} singers");

        var albumCount = 0;
        using var albumReader = new StreamReader("albums_data.txt");
        using (var albumWriter = connection.BeginTextImport("COPY albums FROM STDIN"))
        {
            while (albumReader.ReadLine() is { } line)
            {
                albumWriter.WriteLine(line);
                albumCount++;
            }
        }
        Console.WriteLine($"Copied {albumCount} albums");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./write_data_with_copy.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar write example-db

Go

go run sample_runner.go write example-db

Node.js

npm start write example-db

Python

python write_data_with_copy.py example-db

C#

dotnet run write example-db

You should see:

Copied 5 singers
Copied 5 albums

Query data using SQL

Spanner supports a SQL interface for reading data, which you can access on the command line using the Google Cloud CLI or programmatically using a PostgreSQL driver.

On the command line

Execute the following SQL statement to read the values of all columns from the Albums table:

gcloud spanner databases execute-sql example-db --instance=test-instance \
    --sql='SELECT singer_id, album_id, album_title FROM albums'

The result should be:

SingerId AlbumId AlbumTitle
1        1       Total Junk
1        2       Go, Go, Go
2        1       Green
2        2       Forever Hold Your Peace
2        3       Terrified

Use a PostgreSQL driver

In addition to executing a SQL statement on the command line, you can issue the same SQL statement programmatically using a PostgreSQL driver.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, album_title
         FROM albums"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryData {
  static void queryData(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery("SELECT singer_id, album_id, album_title FROM albums")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("album_title"));
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryData(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, album_title "+
		"FROM albums")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId, albumId int64
		var title string
		err = rows.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, title)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryData(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("SELECT singer_id, album_id, album_title " +
      "FROM albums");
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, album_title "
                        "FROM albums")
            for album in cur:
                print(album)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataSample
{
    public static void QueryData(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, album_title FROM albums", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader.GetInt64(0)} {reader.GetInt64(1)} {reader.GetString(2)}");
        }
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./query_data.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar query example-db

Go

go run sample_runner.go query example-db

Node.js

npm start query example-db

Python

python query_data.py example-db

C#

dotnet run query example-db

You should see the following result:

1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified

Query using a SQL parameter

If your application has a frequently executed query, you can improve its performance by parameterizing it. The resulting parametric query can be cached and reused, which reduces compilation costs. For more information, see Use query parameters to speed up frequently executed queries.

Here is an example of using a parameter in the WHERE clause to query records containing a specific value for LastName.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared statement to use a query parameter.
# Using a prepared statement for executing the same SQL string multiple
# times increases the execution speed of the statement.
psql -c "PREPARE select_singer AS
         SELECT singer_id, first_name, last_name
         FROM singers
         WHERE last_name = \$1" \
     -c "EXECUTE select_singer ('Garcia')"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithParameter {
  static void queryDataWithParameter(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (PreparedStatement statement =
          connection.prepareStatement(
              "SELECT singer_id, first_name, last_name "
                  + "FROM singers "
                  + "WHERE last_name = ?")) {
        statement.setString(1, "Garcia");
        try (ResultSet resultSet = statement.executeQuery()) {
          while (resultSet.next()) {
            System.out.printf(
                "%d %s %s\n",
                resultSet.getLong("singer_id"),
                resultSet.getString("first_name"),
                resultSet.getString("last_name"));
          }
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryDataWithParameter(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx,
		"SELECT singer_id, first_name, last_name "+
			"FROM singers "+
			"WHERE last_name = $1", "Garcia")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId int64
		var firstName, lastName string
		err = rows.Scan(&singerId, &firstName, &lastName)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryWithParameter(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, first_name, last_name " +
      "FROM singers " +
      "WHERE last_name = $1", ["Garcia"]);
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data_with_parameter(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, first_name, last_name "
                        "FROM singers "
                        "WHERE last_name = %s", ("Garcia",))
            for singer in cur:
                print(singer)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithParameterSample
{
    public static void QueryDataWithParameter(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, first_name, last_name "
                                          + "FROM singers "
                                          + "WHERE last_name = $1", connection);
        cmd.Parameters.Add(new NpgsqlParameter { Value = "Garcia" });
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
        }
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./query_data_with_parameter.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar querywithparameter example-db

Go

go run sample_runner.go querywithparameter example-db

Node.js

npm start querywithparameter example-db

Python

python query_data_with_parameter.py example-db

C#

dotnet run querywithparameter example-db

You should see the following result:

12 Melissa Garcia

Update the database schema

Assume you need to add a new column called MarketingBudget to the Albums table. Adding a new column to an existing table requires an update to your database schema. Spanner supports schema updates to a database while the database continues to serve traffic. Schema updates don't require taking the database offline and they don't lock entire tables or columns; you can continue writing data to the database during the schema update. Read more about supported schema updates and schema change performance in Make schema updates.

Add a column

You can add a column on the command line using the Google Cloud CLI or programmatically using a PostgreSQL driver.

On the command line

Use the following ALTER TABLE command to add the new column to the table:

gcloud spanner databases ddl update example-db --instance=test-instance \
    --ddl='ALTER TABLE albums ADD COLUMN marketing_budget BIGINT'

You should see:

Schema updating...done.

Use a PostgreSQL driver

Execute the DDL statement using a PostgreSQL driver to modify the schema:

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "ALTER TABLE albums ADD COLUMN marketing_budget bigint"
echo "Added marketing_budget column"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

class AddColumn {
  static void addColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      connection.createStatement().execute("alter table albums add column marketing_budget bigint");
      System.out.println("Added marketing_budget column");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func AddColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	_, err = conn.Exec(ctx,
		"ALTER TABLE albums "+
			"ADD COLUMN marketing_budget bigint")
	if err != nil {
		return err
	}
	fmt.Println("Added marketing_budget column")

	return nil
}

Node.js

import { Client } from 'pg';

async function addColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  await connection.query(
      "ALTER TABLE albums " +
      "ADD COLUMN marketing_budget bigint");
  console.log("Added marketing_budget column");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def add_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("ALTER TABLE albums "
                        "ADD COLUMN marketing_budget bigint")
            print("Added marketing_budget column")

C#

using Npgsql;

namespace dotnet_snippets;

public static class AddColumnSample
{
    public static void AddColumn(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = connection.CreateCommand();
        cmd.CommandText = "alter table albums add column marketing_budget bigint";
        cmd.ExecuteNonQuery();
        Console.WriteLine("Added marketing_budget column");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./add_column.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar addmarketingbudget example-db

Go

go run sample_runner.go addmarketingbudget example-db

Node.js

npm start addmarketingbudget example-db

Python

python add_column.py example-db

C#

dotnet run addmarketingbudget example-db

You should see:

Added marketing_budget column

Execute a DDL batch

It is recommended to execute multiple schema modifications in one batch. You can execute multiple DDL statements in one batch by using the built-in batching feature of your PostgreSQL driver, by submitting all the DDL statements as one SQL string separated by semicolons, or by using the START BATCH DDL and RUN BATCH statements.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Use a single SQL command to batch multiple statements together.
# Executing multiple DDL statements as one batch is more efficient
# than executing each statement individually.
# Separate the statements with semicolons.
psql << SQL

CREATE TABLE venues (
  venue_id    bigint not null primary key,
  name        varchar(1024),
  description jsonb
);

CREATE TABLE concerts (
  concert_id bigint not null primary key ,
  venue_id   bigint not null,
  singer_id  bigint not null,
  start_time timestamptz,
  end_time   timestamptz,
  constraint fk_concerts_venues foreign key
    (venue_id) references venues (venue_id),
  constraint fk_concerts_singers foreign key
    (singer_id) references singers (singer_id)
);

SQL

echo "Added venues and concerts tables"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class DdlBatch {
  static void ddlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two new tables in one batch.
        statement.addBatch(
            "CREATE TABLE venues ("
                + "  venue_id    bigint not null primary key,"
                + "  name        varchar(1024),"
                + "  description jsonb"
                + ")");
        statement.addBatch(
            "CREATE TABLE concerts ("
                + "  concert_id bigint not null primary key ,"
                + "  venue_id   bigint not null,"
                + "  singer_id  bigint not null,"
                + "  start_time timestamptz,"
                + "  end_time   timestamptz,"
                + "  constraint fk_concerts_venues foreign key"
                + "    (venue_id) references venues (venue_id),"
                + "  constraint fk_concerts_singers foreign key"
                + "    (singer_id) references singers (singer_id)"
                + ")");
        statement.executeBatch();
      }
      System.out.println("Added venues and concerts tables");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func DdlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Executing multiple DDL statements as one batch is
	// more efficient than executing each statement
	// individually.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "CREATE TABLE venues (" +
			"  venue_id    bigint not null primary key," +
			"  name        varchar(1024)," +
			"  description jsonb" +
			")"},
		{SQL: "CREATE TABLE concerts (" +
			"  concert_id bigint not null primary key ," +
			"  venue_id   bigint not null," +
			"  singer_id  bigint not null," +
			"  start_time timestamptz," +
			"  end_time   timestamptz," +
			"  constraint fk_concerts_venues foreign key" +
			"    (venue_id) references venues (venue_id)," +
			"  constraint fk_concerts_singers foreign key" +
			"    (singer_id) references singers (singer_id)" +
			")"},
	}})
	if _, err := br.Exec(); err != nil {
		return err
	}
	if err := br.Close(); err != nil {
		return err
	}
	fmt.Println("Added venues and concerts tables")

	return nil
}

Node.js

import { Client } from 'pg';

async function ddlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Executing multiple DDL statements as one batch is
  // more efficient than executing each statement
  // individually.
  await connection.query("start batch ddl");
  await connection.query("CREATE TABLE venues (" +
      "  venue_id    bigint not null primary key," +
      "  name        varchar(1024)," +
      "  description jsonb" +
      ")");
  await connection.query("CREATE TABLE concerts (" +
      "  concert_id bigint not null primary key ," +
      "  venue_id   bigint not null," +
      "  singer_id  bigint not null," +
      "  start_time timestamptz," +
      "  end_time   timestamptz," +
      "  constraint fk_concerts_venues foreign key" +
      "    (venue_id) references venues (venue_id)," +
      "  constraint fk_concerts_singers foreign key" +
      "    (singer_id) references singers (singer_id)" +
      ")");
  await connection.query("run batch");
  console.log("Added venues and concerts tables");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def ddl_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        # Use a pipeline to batch multiple statements together.
        # Executing multiple DDL statements as one batch is
        # more efficient than executing each statement
        # individually.
        with conn.pipeline():
            # The following statements are buffered on PGAdapter
            # until the pipeline ends.
            conn.execute("CREATE TABLE venues ("
                         "  venue_id    bigint not null primary key,"
                         "  name        varchar(1024),"
                         "  description jsonb"
                         ")")
            conn.execute("CREATE TABLE concerts ("
                         "  concert_id bigint not null primary key ,"
                         "  venue_id   bigint not null,"
                         "  singer_id  bigint not null,"
                         "  start_time timestamptz,"
                         "  end_time   timestamptz,"
                         "  constraint fk_concerts_venues foreign key"
                         "    (venue_id) references venues (venue_id),"
                         "  constraint fk_concerts_singers foreign key"
                         "    (singer_id) references singers (singer_id)"
                         ")")
        print("Added venues and concerts tables")

C#

using Npgsql;

namespace dotnet_snippets;

public static class DdlBatchSample
{
    public static void DdlBatch(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Create two new tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE venues ("
            + "  venue_id    bigint not null primary key,"
            + "  name        varchar(1024),"
            + "  description jsonb"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE concerts ("
            + "  concert_id bigint not null primary key ,"
            + "  venue_id   bigint not null,"
            + "  singer_id  bigint not null,"
            + "  start_time timestamptz,"
            + "  end_time   timestamptz,"
            + "  constraint fk_concerts_venues foreign key"
            + "    (venue_id) references venues (venue_id),"
            + "  constraint fk_concerts_singers foreign key"
            + "    (singer_id) references singers (singer_id)"
            + ")"));
        batch.ExecuteNonQuery();
        Console.WriteLine("Added venues and concerts tables");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./ddl_batch.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar ddlbatch example-db

Go

go run sample_runner.go ddlbatch example-db

Node.js

npm start ddlbatch example-db

Python

python ddl_batch.py example-db

C#

dotnet run ddlbatch example-db

You should see:

Added venues and concerts tables

Write data to the new column

The following code writes data to the new column. It sets MarketingBudget to 100000 for the row keyed by Albums(1, 1) and to 500000 for the row keyed by Albums(2, 2).

PGAdapter translates the PostgreSQL COPY command to mutations. COPY commands are by default translated to Insert mutations. Execute set spanner.copy_upsert=true to translate COPY commands to InsertOrUpdate mutations. This can be used to update existing data in Spanner.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
psql -c "set spanner.copy_upsert=true" \
     -c "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN
         WITH (DELIMITER ';')" \
<< DATA
1;1;100000
2;2;500000
DATA

echo "Copied albums using upsert"

Java

import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class UpdateDataWithCopy {

  static void updateDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
      connection
          .createStatement()
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

      // Instruct PGAdapter to use insert-or-update for COPY statements.
      // This enables us to use COPY to update existing data.
      connection.createStatement().execute("set spanner.copy_upsert=true");

      // COPY uses mutations to insert or update existing data in Spanner.
      long numAlbums =
          copyManager.copyIn(
              "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN",
              new StringReader("1\t1\t100000\n" + "2\t2\t500000\n"));
      System.out.printf("Updated %d albums\n", numAlbums);
    }
  }
}

Go

import (
	"context"
	"fmt"
	"io"

	"github.com/jackc/pgx/v5"
)

func UpdateDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Enable non-atomic mode. This makes the COPY operation non-atomic,
	// and allows it to exceed the Spanner mutation limit.
	if _, err := conn.Exec(ctx,
		"set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
		return err
	}
	// Instruct PGAdapter to use insert-or-update for COPY statements.
	// This enables us to use COPY to update data.
	if _, err := conn.Exec(ctx, "set spanner.copy_upsert=true"); err != nil {
		return err
	}

	// Create a pipe that can be used to write the data manually that we want to copy.
	reader, writer := io.Pipe()
	// Write the data to the pipe using a separate goroutine. This allows us to stream the data
	// to the COPY operation row-by-row.
	go func() error {
		for _, record := range []string{"1\t1\t100000\n", "2\t2\t500000\n"} {
			if _, err := writer.Write([]byte(record)); err != nil {
				return err
			}
		}
		if err := writer.Close(); err != nil {
			return err
		}
		return nil
	}()
	tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN")
	if err != nil {
		return err
	}
	fmt.Printf("Updated %v albums\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import {Readable} from "stream";

async function updateDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

  // Instruct PGAdapter to use insert-or-update for COPY statements.
  // This enables us to use COPY to update existing data.
  await connection.query("set spanner.copy_upsert=true");

  // Copy data to Spanner using the COPY command.
  const copyStream = copyFrom('COPY albums (singer_id, album_id, marketing_budget) FROM STDIN');
  const ingestStream = connection.query(copyStream);

  // Create a source stream and attach the source to the destination.
  const sourceStream = new Readable();
  const operation = pipeline(sourceStream, ingestStream);
  // Manually push data to the source stream to write data to Spanner.
  sourceStream.push("1\t1\t100000\n");
  sourceStream.push("2\t2\t500000\n");
  // Push a 'null' to indicate the end of the stream.
  sourceStream.push(null);
  // Wait for the copy operation to finish.
  await operation;
  console.log(`Updated ${copyStream.rowCount} albums`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def update_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            # Instruct PGAdapter to use insert-or-update for COPY statements.
            # This enables us to use COPY to update data.
            cur.execute("set spanner.copy_upsert=true")

            # COPY uses mutations to insert or update existing data in Spanner.
            with cur.copy("COPY albums (singer_id, album_id, marketing_budget) "
                          "FROM STDIN") as copy:
                copy.write_row((1, 1, 100000))
                copy.write_row((2, 2, 500000))
            print("Updated %d albums" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class UpdateDataWithCopySample
{
    public static void UpdateDataWithCopy(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = connection.CreateCommand();
        cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
        cmd.ExecuteNonQuery();

        // Instruct PGAdapter to use insert-or-update for COPY statements.
        // This enables us to use COPY to update existing data.
        cmd.CommandText = "set spanner.copy_upsert=true";
        cmd.ExecuteNonQuery();

        // COPY uses mutations to insert or update existing data in Spanner.
        using (var albumWriter = connection.BeginTextImport(
                   "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN"))
        {
            albumWriter.WriteLine("1\t1\t100000");
            albumWriter.WriteLine("2\t2\t500000");
        }
        Console.WriteLine($"Updated 2 albums");
    }
}

Run the sample with the following command:

psql

PGDATABASE=example-db ./update_data_with_copy.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar update example-db

Go

go run sample_runner.go update example-db

Node.js

npm start update example-db

Python

python update_data_with_copy.py example-db

C#

dotnet run update example-db

You should see:

Updated 2 albums

You can also execute a SQL query to fetch the values that you just wrote.

Here's the code to execute the query:

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, marketing_budget
         FROM albums
         ORDER BY singer_id, album_id"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithNewColumn {
  static void queryDataWithNewColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery(
                  "SELECT singer_id, album_id, marketing_budget "
                      + "FROM albums "
                      + "ORDER BY singer_id, album_id")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("marketing_budget"));
        }
      }
    }
  }
}

Go

import (
	"context"
	"database/sql"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryDataWithNewColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, marketing_budget "+
		"FROM albums "+
		"ORDER BY singer_id, album_id")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId, albumId int64
		var marketingBudget sql.NullString
		err = rows.Scan(&singerId, &albumId, &marketingBudget)
		if err != nil {
			return err
		}
		var budget string
		if marketingBudget.Valid {
			budget = marketingBudget.String
		} else {
			budget = "NULL"
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, budget)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryDataWithNewColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, album_id, marketing_budget "
      + "FROM albums "
      + "ORDER BY singer_id, album_id"
  );
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["marketing_budget"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data_with_new_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, marketing_budget "
                        "FROM albums "
                        "ORDER BY singer_id, album_id")
            for album in cur:
                print(album)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithNewColumnSample
{
    public static void QueryWithNewColumnData(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, marketing_budget "
                                          + "FROM albums "
                                          + "ORDER BY singer_id, album_id", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["marketing_budget"]}");
        }
    }
}

Run the query with this command:

psql

PGDATABASE=example-db ./query_data_with_new_column.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar querymarketingbudget example-db

Go

go run sample_runner.go querymarketingbudget example-db

Node.js

npm start querymarketingbudget example-db

Python

python query_data_with_new_column.py example-db

C#

dotnet run querymarketingbudget example-db

You should see:

1 1 100000
1 2 null
2 1 null
2 2 500000
2 3 null

Update data

You can update data using DML in a read-write transaction.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Transfer marketing budget from one album to another.
  -- We do it in a transaction to ensure that the transfer is atomic.
  -- Begin a read/write transaction.
  begin;

  -- Increase the marketing budget of album 1 if album 2 has enough budget.
  -- The condition that album 2 has enough budget is guaranteed for the
  -- duration of the transaction, as read/write transactions in Spanner use
  -- external consistency as the default isolation level.
  update albums set
    marketing_budget = marketing_budget + 200000
  where singer_id = 1
    and  album_id = 1
    and exists (
      select album_id
      from albums
      where singer_id = 2
        and  album_id = 2
        and marketing_budget > 200000
      );

  -- Decrease the marketing budget of album 2.      
  update albums set
    marketing_budget = marketing_budget - 200000
  where singer_id = 2
    and  album_id = 2
    and marketing_budget > 200000;

  -- Commit the transaction to make the changes to both marketing budgets
  -- durably stored in the database and visible to other transactions.
  commit;  
SQL

echo "Transferred marketing budget from Album 2 to Album 1"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class UpdateDataWithTransaction {

  static void writeWithTransactionUsingDml(String host, int port, String database)
      throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      connection.setAutoCommit(false);

      // Transfer marketing budget from one album to another. We do it in a
      // transaction to ensure that the transfer is atomic. There is no need
      // to explicitly start the transaction. The first statement on the
      // connection will start a transaction when AutoCommit=false.
      String selectMarketingBudgetSql =
          "SELECT marketing_budget from albums WHERE singer_id = ? and album_id = ?";
      long album2Budget = 0;
      try (PreparedStatement selectMarketingBudgetStatement =
          connection.prepareStatement(selectMarketingBudgetSql)) {
        // Bind the query parameters to SingerId=2 and AlbumId=2.
        selectMarketingBudgetStatement.setLong(1, 2);
        selectMarketingBudgetStatement.setLong(2, 2);
        try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
          while (resultSet.next()) {
            album2Budget = resultSet.getLong("marketing_budget");
          }
        }
        // The transaction will only be committed if this condition still holds
        // at the time of commit. Otherwise, the transaction will be aborted.
        final long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget = 0;
          // Re-use the existing PreparedStatement for selecting the
          // marketing_budget to get the budget for Album 1.
          // Bind the query parameters to SingerId=1 and AlbumId=1.
          selectMarketingBudgetStatement.setLong(1, 1);
          selectMarketingBudgetStatement.setLong(2, 1);
          try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
            while (resultSet.next()) {
              album1Budget = resultSet.getLong("marketing_budget");
            }
          }

          // Transfer part of the marketing budget of Album 2 to Album 1.
          album1Budget += transfer;
          album2Budget -= transfer;
          String updateSql =
              "UPDATE albums "
                  + "SET marketing_budget = ? "
                  + "WHERE singer_id = ? and album_id = ?";
          try (PreparedStatement updateStatement = connection.prepareStatement(updateSql)) {
            // Update Album 1.
            int paramIndex = 0;
            updateStatement.setLong(++paramIndex, album1Budget);
            updateStatement.setLong(++paramIndex, 1);
            updateStatement.setLong(++paramIndex, 1);
            // Create a DML batch by calling addBatch
            // on the current PreparedStatement.
            updateStatement.addBatch();

            // Update Album 2 in the same DML batch.
            paramIndex = 0;
            updateStatement.setLong(++paramIndex, album2Budget);
            updateStatement.setLong(++paramIndex, 2);
            updateStatement.setLong(++paramIndex, 2);
            updateStatement.addBatch();

            // Execute both DML statements in one batch.
            updateStatement.executeBatch();
          }
        }
      }
      // Commit the current transaction.
      connection.commit();
      System.out.println("Transferred marketing budget from Album 2 to Album 1");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteWithTransactionUsingDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Transfer marketing budget from one album to another. We do it in a
	// transaction to ensure that the transfer is atomic.
	tx, err := conn.Begin(ctx)
	if err != nil {
		return err
	}
	const selectSql = "SELECT marketing_budget " +
		"from albums " +
		"WHERE singer_id = $1 and album_id = $2"
	// Get the marketing_budget of singer 2 / album 2.
	row := tx.QueryRow(ctx, selectSql, 2, 2)
	var budget2 int64
	if err := row.Scan(&budget2); err != nil {
		tx.Rollback(ctx)
		return err
	}
	const transfer = 20000
	// The transaction will only be committed if this condition still holds
	// at the time of commit. Otherwise, the transaction will be aborted.
	if budget2 >= transfer {
		// Get the marketing_budget of singer 1 / album 1.
		row := tx.QueryRow(ctx, selectSql, 1, 1)
		var budget1 int64
		if err := row.Scan(&budget1); err != nil {
			tx.Rollback(ctx)
			return err
		}
		// Transfer part of the marketing budget of Album 2 to Album 1.
		budget1 += transfer
		budget2 -= transfer
		const updateSql = "UPDATE albums " +
			"SET marketing_budget = $1 " +
			"WHERE singer_id = $2 and album_id = $3"
		// Start a DML batch and execute it as part of the current transaction.
		batch := &pgx.Batch{}
		batch.Queue(updateSql, budget1, 1, 1)
		batch.Queue(updateSql, budget2, 2, 2)
		br := tx.SendBatch(ctx, batch)
		_, err = br.Exec()
		if err := br.Close(); err != nil {
			tx.Rollback(ctx)
			return err
		}
	}
	// Commit the current transaction.
	tx.Commit(ctx)
	fmt.Println("Transferred marketing budget from Album 2 to Album 1")

	return nil
}

Node.js

import { Client } from 'pg';

async function writeWithTransactionUsingDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Transfer marketing budget from one album to another. We do it in a