Introdução ao Spanner e ao PGAdapter


Objetivos

Neste tutorial, mostramos as seguintes etapas usando o Spanner Proxy local PGAdapter para drivers PostgreSQL:

  • Crie uma instância e um banco de dados do Spanner.
  • Gravar, ler e executar consultas SQL em dados contidos no banco de dados.
  • Atualizar o esquema do banco de dados.
  • Atualizar dados usando uma transação de leitura e gravação.
  • Adicionar um índice secundário ao banco de dados.
  • Usar o índice para ler e executar consultas SQL nos dados.
  • Recuperar dados usando uma transação somente leitura.

Custos

Neste tutorial, usamos o Spanner, que é um componente faturável da Google Cloud. Para mais informações sobre o custo do uso do Spanner, consulte Preços.

Antes de começar

Conclua as etapas descritas em Configurar, que abrangem a criação e a configuração de um projeto padrão do Google Cloud, o faturamento, a API Cloud Spanner e a configuração do OAuth 2.0 para receber credenciais de autenticação para usar a API Cloud Spanner.

Especificamente, execute gcloud auth application-default login para configurar o ambiente de desenvolvimento local com credenciais de autenticação.

Preparar o ambiente PGAdapter local

É possível usar drivers PostgreSQL em combinação com o PGAdapter para se conectar ao Spanner. O PGAdapter é um proxy local que converte o protocolo de rede do PostgreSQL no protocolo gRPC do Spanner.

O PGAdapter exige o Java ou o Docker para funcionar.

  1. Instale um dos seguintes itens na sua máquina de desenvolvimento, se nenhum deles já estiver instalado:

  2. Clone o repositório do app de amostra na máquina local:

    git clone https://github.com/GoogleCloudPlatform/pgadapter.git
    
  3. Mude para o diretório que contém o exemplo de código do Spanner:

    psql

    cd pgadapter/samples/snippets/psql-snippets
    

    Java

    cd pgadapter/samples/snippets/java-snippets
    mvn package -DskipTests
    

    Go

    cd pgadapter/samples/snippets/golang-snippets
    

    Node.js

    cd pgadapter/samples/snippets/nodejs-snippets
    npm install
    

    Python

    cd pgadapter/samples/snippets/python-snippets
    python -m venv ./venv
    pip install -r requirements.txt
    cd samples
    

    C#

    cd pgadapter/samples/snippets/dotnet-snippets
    

Criar uma instância

Ao usar o Spanner pela primeira vez, é necessário criar uma instância, que é uma alocação de recursos usados pelos bancos de dados do Spanner. Ao criar uma instância, escolha uma configuração que determine onde os dados serão armazenados e também o número de nós a serem usados. Isso determina a quantidade de recursos de exibição e armazenamento na instância.

Execute o seguinte comando para criar uma instância do Spanner na região us-central1 com 1 nó:

gcloud spanner instances create test-instance --config=regional-us-central1 \
    --description="Test Instance" --nodes=1

A instância criada tem as seguintes características:

  • Código da instância: test-instance
  • Nome de exibição: Test Instance
  • Configuração da instância: regional-us-central1 as configurações regionais armazenam dados em uma região, enquanto as configurações multirregionais distribuem dados em várias regiões. Para mais informações, consulte Sobre as instâncias.
  • Um nó node_count corresponde à quantidade de recursos de exibição e armazenamento disponíveis aos bancos de dados na instância. Saiba mais em Nós e unidades de processamento.

Você verá:

Creating instance...done.

Consultar os arquivos de amostra

O repositório de amostras contém um exemplo que mostra como usar o Spanner com o PGAdapter.

Dê uma olhada na pasta samples/snippets, que mostra como usar no Spanner. O código mostra como criar e usar um novo banco de dados. Os dados usam o esquema de exemplo exibido na página Esquema e modelo de dados.

Iniciar PGAdapter

Inicie o PGAdapter na sua máquina de desenvolvimento local e aponte-o para o que você criou.

Os comandos a seguir pressupõem que você tenha executado gcloud auth application-default login.

Aplicativo do Java

wget https://storage.googleapis.com/pgadapter-jar-releases/pgadapter.tar.gz \
    && tar -xzvf pgadapter.tar.gz
java -jar pgadapter.jar -i test-instance

Docker

docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter
docker run \
    --name pgadapter \
    --rm -d -p 5432:5432 \
    -v "$HOME/.config/gcloud":/gcloud:ro \
    --env CLOUDSDK_CONFIG=/gcloud \
    gcr.io/cloud-spanner-pg-adapter/pgadapter \
    -i test-instance -x

Emulador

docker pull gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator
docker run \
    --name pgadapter-emulator \
    --rm -d \
    -p 5432:5432 \
    -p 9010:9010 \
    -p 9020:9020 \
    gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator

Isso inicia o PGAdapter com um Spanner incorporado emulador. Esse emulador incorporado cria automaticamente qualquer instância ou banco de dados do Spanner a que você se conecta sem precisar criá-los manualmente.

Recomendamos que você execute o PGAdapter em produção como um contêiner side-car ou como uma dependência em processo. Para mais informações sobre implantação PGAdapter em produção. Consulte Escolha um método para executar o PGAdapter.

Criar um banco de dados

Crie um banco de dados com o nome example-db na instância com o nome test-instance ao executando o seguinte na linha de comando:

gcloud spanner databases create example-db --instance=test-instance \
    --database-dialect=POSTGRESQL

Você verá:

Creating database...done.

crie tabelas

O código a seguir cria duas tabelas no banco de dados.

psql

#!/bin/bash

# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create two tables in one batch.
psql << SQL
-- Create the singers table
CREATE TABLE singers (
  singer_id   bigint not null primary key,
  first_name  character varying(1024),
  last_name   character varying(1024),
  singer_info bytea,
  full_name   character varying(2048) GENERATED ALWAYS
          AS (first_name || ' ' || last_name) STORED
);

-- Create the albums table. This table is interleaved in the parent table
-- "singers".
CREATE TABLE albums (
  singer_id     bigint not null,
  album_id      bigint not null,
  album_title   character varying(1024),
  primary key (singer_id, album_id)
)
-- The 'interleave in parent' clause is a Spanner-specific extension to
-- open-source PostgreSQL.
INTERLEAVE IN PARENT singers ON DELETE CASCADE;
SQL

echo "Created Singers & Albums tables in database: [${PGDATABASE}]"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class CreateTables {
  static void createTables(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two tables in one batch.
        statement.addBatch(
            "create table singers ("
                + "  singer_id   bigint primary key not null,"
                + "  first_name  varchar(1024),"
                + "  last_name   varchar(1024),"
                + "  singer_info bytea,"
                + "  full_name   varchar(2048) generated always as (\n"
                + "      case when first_name is null then last_name\n"
                + "          when last_name  is null then first_name\n"
                + "          else first_name || ' ' || last_name\n"
                + "      end) stored"
                + ")");
        statement.addBatch(
            "create table albums ("
                + "  singer_id     bigint not null,"
                + "  album_id      bigint not null,"
                + "  album_title   varchar,"
                + "  primary key (singer_id, album_id)"
                + ") interleave in parent singers on delete cascade");
        statement.executeBatch();
        System.out.println("Created Singers & Albums tables in database: [" + database + "]");
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func CreateTables(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Create two tables in one batch on Spanner.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "create table singers (" +
			"  singer_id   bigint primary key not null," +
			"  first_name  character varying(1024)," +
			"  last_name   character varying(1024)," +
			"  singer_info bytea," +
			"  full_name   character varying(2048) generated " +
			"  always as (first_name || ' ' || last_name) stored" +
			")"},
		{SQL: "create table albums (" +
			"  singer_id     bigint not null," +
			"  album_id      bigint not null," +
			"  album_title   character varying(1024)," +
			"  primary key (singer_id, album_id)" +
			") interleave in parent singers on delete cascade"},
	}})
	cmd, err := br.Exec()
	if err != nil {
		return err
	}
	if cmd.String() != "CREATE" {
		return fmt.Errorf("unexpected command tag: %v", cmd.String())
	}
	if err := br.Close(); err != nil {
		return err
	}
	fmt.Printf("Created Singers & Albums tables in database: [%s]\n", database)

	return nil
}

Node.js

import { Client } from 'pg';

async function createTables(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Create two tables in one batch.
  await connection.query("start batch ddl");
  await connection.query("create table singers (" +
      "  singer_id   bigint primary key not null," +
      "  first_name  character varying(1024)," +
      "  last_name   character varying(1024)," +
      "  singer_info bytea," +
      "  full_name   character varying(2048) generated " +
      "  always as (first_name || ' ' || last_name) stored" +
      ")");
  await connection.query("create table albums (" +
      "  singer_id     bigint not null," +
      "  album_id      bigint not null," +
      "  album_title   character varying(1024)," +
      "  primary key (singer_id, album_id)" +
      ") interleave in parent singers on delete cascade");
  await connection.query("run batch");
  console.log(`Created Singers & Albums tables in database: [${database}]`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def create_tables(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    with psycopg.connect("host={host} port={port} "
                         "dbname={database} "
                         "sslmode=disable".format(host=host, port=port,
                                                  database=database)) as conn:
        # Enable autocommit to execute DDL statements, as psycopg otherwise
        # tries to use a read/write transaction.
        conn.autocommit = True

        # Use a pipeline to execute multiple DDL statements in one batch.
        with conn.pipeline():
            conn.execute("create table singers ("
                         + "  singer_id   bigint primary key not null,"
                         + "  first_name  character varying(1024),"
                         + "  last_name   character varying(1024),"
                         + "  singer_info bytea,"
                         + "  full_name   character varying(2048) generated "
                         + "  always as (first_name || ' ' || last_name) stored"
                         + ")")
            conn.execute("create table albums ("
                         + "  singer_id     bigint not null,"
                         + "  album_id      bigint not null,"
                         + "  album_title   character varying(1024),"
                         + "  primary key (singer_id, album_id)"
                         + ") interleave in parent singers on delete cascade")
        print("Created Singers & Albums tables in database: [{database}]"
              .format(database=database))

C#

using Npgsql;

namespace dotnet_snippets;

public static class CreateTablesSample
{
    public static void CreateTables(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Create two tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table singers ("
            + "  singer_id   bigint primary key not null,"
            + "  first_name  varchar(1024),"
            + "  last_name   varchar(1024),"
            + "  singer_info bytea,"
            + "  full_name   varchar(2048) generated always as (\n"
            + "      case when first_name is null then last_name\n"
            + "          when last_name  is null then first_name\n"
            + "          else first_name || ' ' || last_name\n"
            + "      end) stored"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "create table albums ("
            + "  singer_id     bigint not null,"
            + "  album_id      bigint not null,"
            + "  album_title   varchar,"
            + "  primary key (singer_id, album_id)"
            + ") interleave in parent singers on delete cascade"));
        batch.ExecuteNonQuery();
        Console.WriteLine($"Created Singers & Albums tables in database: [{database}]");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./create_tables.sh example-db

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar createtables example-db

Go

go run sample_runner.go createtables example-db

Node.js

npm start createtables example-db

Python

python create_tables.py example-db

C#

dotnet run createtables example-db

O próximo passo é gravar dados no seu banco de dados.

Crie uma conexão

Antes de fazer leituras ou gravações, é necessário criar uma conexão com o PGAdapter. Todas as suas interações com o Spanner precisam passar por um Connection. O nome do banco de dados é especificado na string de conexão.

psql

#!/bin/bash

# Set the connection variables for psql.
# The following statements use the existing value of the variable if it has
# already been set, and otherwise assigns a default value.
export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Connect to Cloud Spanner using psql through PGAdapter
# and execute a simple query.
psql -c "select 'Hello world!' as hello"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class CreateConnection {
  static void createConnection(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection.createStatement().executeQuery("select 'Hello world!' as hello")) {
        while (resultSet.next()) {
          System.out.printf("Greeting from Cloud Spanner PostgreSQL: %s\n", resultSet.getString(1));
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func CreateConnection(host string, port int, database string) error {
	ctx := context.Background()
	// Connect to Cloud Spanner using pgx through PGAdapter.
	// 'sslmode=disable' is optional, but adding it reduces the connection time,
	// as pgx will then skip first trying to create an SSL connection.
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	row := conn.QueryRow(ctx, "select 'Hello world!' as hello")
	var msg string
	if err := row.Scan(&msg); err != nil {
		return err
	}
	fmt.Printf("Greeting from Cloud Spanner PostgreSQL: %s\n", msg)

	return nil
}

Node.js

import { Client } from 'pg';

async function createConnection(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("select 'Hello world!' as hello");
  console.log(`Greeting from Cloud Spanner PostgreSQL: ${result.rows[0]['hello']}`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def create_connection(host: string, port: int, database: string):
    # Connect to Cloud Spanner using psycopg3 through PGAdapter.
    # 'sslmode=disable' is optional, but adding it reduces the connection time,
    # as psycopg3 will then skip first trying to create an SSL connection.
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("select 'Hello world!' as hello")
            print("Greeting from Cloud Spanner PostgreSQL:", cur.fetchone()[0])

C#

using Npgsql;

namespace dotnet_snippets;

public static class CreateConnectionSample
{
    public static void CreateConnection(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("select 'Hello World!' as hello", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            var greeting = reader.GetString(0);
            Console.WriteLine($"Greeting from Cloud Spanner PostgreSQL: {greeting}");
        }
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./create_connection.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar createconnection example-db

Go

go run sample_runner.go createconnection example-db

Node.js

npm start createconnection example-db

Python

python create_connection.py example-db

C#

dotnet run createconnection example-db

Gravar dados com DML

É possível inserir dados usando a linguagem de manipulação de dados (DML, na sigla em inglês) em uma transação de leitura/gravação.

Estes exemplos mostram como executar uma instrução DML no Spanner usando um driver do PostgreSQL.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "INSERT INTO singers (singer_id, first_name, last_name) VALUES
                             (12, 'Melissa', 'Garcia'),
                             (13, 'Russel', 'Morales'),
                             (14, 'Jacqueline', 'Long'),
                             (15, 'Dylan', 'Shaw')"

echo "4 records inserted"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDml {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;
    }
  }

  static void writeDataWithDml(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add 4 rows in one statement.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
          connection.prepareStatement(
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?), "
                  + "(?, ?, ?)")) {

        final List<Singer> singers =
            Arrays.asList(
                new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
                new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
                new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
                new Singer(/* SingerId = */ 15L, "Dylan", "Shaw"));

        // Note that JDBC parameters start at index 1.
        int paramIndex = 0;
        for (Singer singer : singers) {
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);
        }

        int updateCount = preparedStatement.executeUpdate();
        System.out.printf("%d records inserted.\n", updateCount);
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	tag, err := conn.Exec(ctx,
		"INSERT INTO singers (singer_id, first_name, last_name) "+
			"VALUES ($1, $2, $3), ($4, $5, $6), "+
			"       ($7, $8, $9), ($10, $11, $12)",
		12, "Melissa", "Garcia",
		13, "Russel", "Morales",
		14, "Jacqueline", "Long",
		15, "Dylan", "Shaw")
	if err != nil {
		return err
	}
	fmt.Printf("%v records inserted\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';

async function writeDataWithDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("INSERT INTO singers (singer_id, first_name, last_name) " +
      "VALUES ($1, $2, $3), ($4, $5, $6), " +
      "       ($7, $8, $9), ($10, $11, $12)",
       [12, "Melissa", "Garcia",
        13, "Russel", "Morales",
        14, "Jacqueline", "Long",
        15, "Dylan", "Shaw"])
  console.log(`${result.rowCount} records inserted`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def write_data_with_dml(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("INSERT INTO singers (singer_id, first_name, last_name)"
                        " VALUES (%s, %s, %s), (%s, %s, %s), "
                        "        (%s, %s, %s), (%s, %s, %s)",
                        (12, "Melissa", "Garcia",
                         13, "Russel", "Morales",
                         14, "Jacqueline", "Long",
                         15, "Dylan", "Shaw",))
            print("%d records inserted" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlSample
{
    readonly struct Singer
    {
        public Singer(long singerId, string firstName, string lastName)
        {
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;
        }

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }
    }

    public static void WriteDataWithDml(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();
        // Add 4 rows in one statement.
        using var cmd = new NpgsqlCommand("INSERT INTO singers (singer_id, first_name, last_name) VALUES "
                                          + "($1, $2, $3), "
                                          + "($4, $5, $6), "
                                          + "($7, $8, $9), "
                                          + "($10, $11, $12)", connection);
        List<Singer> singers =
        [
            new Singer(/* SingerId = */ 12L, "Melissa", "Garcia"),
            new Singer(/* SingerId = */ 13L, "Russel", "Morales"),
            new Singer(/* SingerId = */ 14L, "Jacqueline", "Long"),
            new Singer(/* SingerId = */ 15L, "Dylan", "Shaw")
        ];
        foreach (var singer in singers)
        {
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.SingerId });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.FirstName });
            cmd.Parameters.Add(new NpgsqlParameter { Value = singer.LastName });
        }
        var updateCount = cmd.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./write_data_with_dml.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdml example-db

Go

go run sample_runner.go writeusingdml example-db

Node.js

npm start writeusingdml example-db

Python

python write_data_with_dml.py example-db

C#

dotnet run writeusingdml example-db

Você verá a seguinte resposta:

 4 records inserted.

Gravar dados com um lote DML

O PGAdapter oferece suporte à execução de lotes DML. Enviar várias DML de ida e volta em um lote reduz o número de viagens de ida e volta para no Spanner e melhora o desempenho do aplicativo.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared insert statement and execute this prepared
# insert statement three times in one SQL string. The single
# SQL string with three insert statements will be executed as
# a single DML batch on Spanner.
psql -c "PREPARE insert_singer AS
           INSERT INTO singers (singer_id, first_name, last_name)
           VALUES (\$1, \$2, \$3)" \
     -c "EXECUTE insert_singer (16, 'Sarah', 'Wilson');
         EXECUTE insert_singer (17, 'Ethan', 'Miller');
         EXECUTE insert_singer (18, 'Maya', 'Patel');"

echo "3 records inserted"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

class WriteDataWithDmlBatch {
  static class Singer {
    private final long singerId;
    private final String firstName;
    private final String lastName;

    Singer(final long id, final String first, final String last) {
      this.singerId = id;
      this.firstName = first;
      this.lastName = last;
    }
  }

  static void writeDataWithDmlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Add multiple rows in one DML batch.
      // JDBC always uses '?' as a parameter placeholder.
      try (PreparedStatement preparedStatement =
          connection.prepareStatement(
              "INSERT INTO singers (singer_id, first_name, last_name) VALUES (?, ?, ?)")) {
        final List<Singer> singers =
            Arrays.asList(
                new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
                new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
                new Singer(/* SingerId = */ 18L, "Maya", "Patel"));

        for (Singer singer : singers) {
          // Note that JDBC parameters start at index 1.
          int paramIndex = 0;
          preparedStatement.setLong(++paramIndex, singer.singerId);
          preparedStatement.setString(++paramIndex, singer.firstName);
          preparedStatement.setString(++paramIndex, singer.lastName);
          preparedStatement.addBatch();
        }

        int[] updateCounts = preparedStatement.executeBatch();
        System.out.printf("%d records inserted.\n", Arrays.stream(updateCounts).sum());
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithDmlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	sql := "INSERT INTO singers (singer_id, first_name, last_name) " +
		"VALUES ($1, $2, $3)"
	batch := &pgx.Batch{}
	batch.Queue(sql, 16, "Sarah", "Wilson")
	batch.Queue(sql, 17, "Ethan", "Miller")
	batch.Queue(sql, 18, "Maya", "Patel")
	br := conn.SendBatch(ctx, batch)
	_, err = br.Exec()
	if err := br.Close(); err != nil {
		return err
	}

	if err != nil {
		return err
	}
	fmt.Printf("%v records inserted\n", batch.Len())

	return nil
}

Node.js

import { Client } from 'pg';

async function writeDataWithDmlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // node-postgres does not support PostgreSQL pipeline mode, so we must use the
  // `start batch dml` / `run batch` statements to execute a DML batch.
  const sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
  await connection.query("start batch dml");
  await connection.query(sql, [16, "Sarah", "Wilson"]);
  await connection.query(sql, [17, "Ethan", "Miller"]);
  await connection.query(sql, [18, "Maya", "Patel"]);
  const result = await connection.query("run batch");
  // RUN BATCH returns the update counts as an array of strings, with one element for each
  // DML statement in the batch. This calculates the total number of affected rows from that array.
  const updateCount = result.rows[0]["UPDATE_COUNTS"]
      .map((s: string) => parseInt(s))
      .reduce((c: number, current: number) => c + current, 0);
  console.log(`${updateCount} records inserted`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def write_data_with_dml_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.executemany("INSERT INTO singers "
                            "(singer_id, first_name, last_name) "
                            "VALUES (%s, %s, %s)",
                            [(16, "Sarah", "Wilson",),
                             (17, "Ethan", "Miller",),
                             (18, "Maya", "Patel",), ])
            print("%d records inserted" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithDmlBatchSample
{
    readonly struct Singer
    {
        public Singer(long singerId, string firstName, string lastName)
        {
            SingerId = singerId;
            FirstName = firstName;
            LastName = lastName;
        }

        public long SingerId { get; }
        public string FirstName { get; }
        public string LastName { get; }
    }

    public static void WriteDataWithDmlBatch(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Add multiple rows in one DML batch.
        const string sql = "INSERT INTO singers (singer_id, first_name, last_name) VALUES ($1, $2, $3)";
        List<Singer> singers =
        [
            new Singer(/* SingerId = */ 16L, "Sarah", "Wilson"),
            new Singer(/* SingerId = */ 17L, "Ethan", "Miller"),
            new Singer(/* SingerId = */ 18L, "Maya", "Patel")
        ];
        using var batch = new NpgsqlBatch(connection);
        foreach (var singer in singers)
        {
            batch.BatchCommands.Add(new NpgsqlBatchCommand
            {
                CommandText = sql,
                Parameters =
                {
                    new NpgsqlParameter {Value = singer.SingerId},
                    new NpgsqlParameter {Value = singer.FirstName},
                    new NpgsqlParameter {Value = singer.LastName}
                }
            });
        }
        var updateCount = batch.ExecuteNonQuery();
        Console.WriteLine($"{updateCount} records inserted.");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./write_data_with_dml_batch.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar writeusingdmlbatch example-db

Go

go run sample_runner.go writeusingdmlbatch example-db

Node.js

npm start writeusingdmlbatch example-db

Python

python write_data_with_dml_batch.py example-db

C#

dotnet run writeusingdmlbatch example-db

Você verá:

3 records inserted.

Gravar dados com mutações

Também é possível inserir dados usando mutações.

O PGAdapter traduz o comando COPY do PostgreSQL em mutações. Usar COPY é a maneira mais eficiente de inserir dados rapidamente no banco de dados do Spanner.

As operações COPY são atômicas por padrão. Operações atômicas no O Spanner está vinculado ao limite de tamanho de confirmação. Para mais informações, consulte Limite de CRUD.

Estes exemplos mostram como executar uma operação COPY não atômica. Isso permite que a operação COPY exceda o limite de tamanho de confirmação.

psql

#!/bin/bash

# Get the source directory of this script.
directory=${BASH_SOURCE%/*}/

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Copy data to Spanner from a tab-separated text file using the COPY command.
psql -c "COPY singers (singer_id, first_name, last_name) FROM STDIN" \
  < "${directory}singers_data.txt"
psql -c "COPY albums FROM STDIN" \
  < "${directory}albums_data.txt"

echo "Copied singers and albums"

Java

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class WriteDataWithCopy {

  static void writeDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
      connection
          .createStatement()
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
      long numSingers =
          copyManager.copyIn(
              "COPY singers (singer_id, first_name, last_name) FROM STDIN",
              WriteDataWithCopy.class.getResourceAsStream("singers_data.txt"));
      System.out.printf("Copied %d singers\n", numSingers);

      long numAlbums =
          copyManager.copyIn(
              "COPY albums FROM STDIN",
              WriteDataWithCopy.class.getResourceAsStream("albums_data.txt"));
      System.out.printf("Copied %d albums\n", numAlbums);
    }
  }
}

Go

import (
	"context"
	"fmt"
	"os"

	"github.com/jackc/pgx/v5"
)

func WriteDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
	// will succeed even if it exceeds Spanner's mutation limit per transaction.
	conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'")

	file, err := os.Open("samples/singers_data.txt")
	if err != nil {
		return err
	}
	tag, err := conn.PgConn().CopyFrom(ctx, file,
		"copy singers (singer_id, first_name, last_name) from stdin")
	if err != nil {
		return err
	}
	fmt.Printf("Copied %v singers\n", tag.RowsAffected())

	file, err = os.Open("samples/albums_data.txt")
	if err != nil {
		return err
	}
	tag, err = conn.PgConn().CopyFrom(ctx, file,
		"copy albums from stdin")
	if err != nil {
		return err
	}
	fmt.Printf("Copied %v albums\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
import { from as copyFrom } from 'pg-copy-streams'
import path from "path";

async function writeDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
  // Copy data from a csv file to Spanner using the COPY command.
  // Note that even though the command says 'from stdin', the actual input comes from a file.
  const copySingersStream = copyFrom('copy singers (singer_id, first_name, last_name) from stdin');
  const ingestSingersStream = connection.query(copySingersStream);
  const sourceSingersStream = fs.createReadStream(path.join(__dirname, 'singers_data.txt'));
  await pipeline(sourceSingersStream, ingestSingersStream);
  console.log(`Copied ${copySingersStream.rowCount} singers`);

  const copyAlbumsStream = copyFrom('copy albums from stdin');
  const ingestAlbumsStream = connection.query(copyAlbumsStream);
  const sourceAlbumsStream = fs.createReadStream(path.join(__dirname, 'albums_data.txt'));
  await pipeline(sourceAlbumsStream, ingestAlbumsStream);
  console.log(`Copied ${copyAlbumsStream.rowCount} albums`);

  // Close the connection.
  await connection.end();
}

Python

import os
import string
import psycopg


def write_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:

        script_dir = os.path.dirname(os.path.abspath(__file__))
        singers_file_path = os.path.join(script_dir, "singers_data.txt")
        albums_file_path = os.path.join(script_dir, "albums_data.txt")

        conn.autocommit = True
        block_size = 1024
        with conn.cursor() as cur:
            with open(singers_file_path, "r") as f:
                with cur.copy("COPY singers (singer_id, first_name, last_name) "
                              "FROM STDIN") as copy:
                    while data := f.read(block_size):
                        copy.write(data)
            print("Copied %d singers" % cur.rowcount)

            with open(albums_file_path, "r") as f:
                with cur.copy("COPY albums "
                              "FROM STDIN") as copy:
                    while data := f.read(block_size):
                        copy.write(data)
            print("Copied %d albums" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class WriteDataWithCopySample
{
    public static void WriteDataWithCopy(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = new NpgsqlCommand("set spanner.autocommit_dml_mode='partitioned_non_atomic'", connection);
        cmd.ExecuteNonQuery();

        var singerCount = 0;
        using var singerReader = new StreamReader("singers_data.txt");
        using (var singerWriter = connection.BeginTextImport("COPY singers (singer_id, first_name, last_name) FROM STDIN"))
        {
            while (singerReader.ReadLine() is { } line)
            {
                singerWriter.WriteLine(line);
                singerCount++;
            }
        }
        Console.WriteLine($"Copied {singerCount} singers");

        var albumCount = 0;
        using var albumReader = new StreamReader("albums_data.txt");
        using (var albumWriter = connection.BeginTextImport("COPY albums FROM STDIN"))
        {
            while (albumReader.ReadLine() is { } line)
            {
                albumWriter.WriteLine(line);
                albumCount++;
            }
        }
        Console.WriteLine($"Copied {albumCount} albums");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./write_data_with_copy.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar write example-db

Go

go run sample_runner.go write example-db

Node.js

npm start write example-db

Python

python write_data_with_copy.py example-db

C#

dotnet run write example-db

Você verá:

Copied 5 singers
Copied 5 albums

Consultar dados usando SQL

O Spanner oferece suporte a uma interface SQL para leitura de dados, que pode ser acessada na linha de comando usando a CLI do Google Cloud ou programaticamente usando um driver do PostgreSQL.

Na linha de comando

Execute a instrução SQL a seguir para ler os valores de todas as colunas da tabela Albums:

gcloud spanner databases execute-sql example-db --instance=test-instance \
    --sql='SELECT singer_id, album_id, album_title FROM albums'

O resultado será:

SingerId AlbumId AlbumTitle
1        1       Total Junk
1        2       Go, Go, Go
2        1       Green
2        2       Forever Hold Your Peace
2        3       Terrified

Usar um driver do PostgreSQL

Além de executar uma instrução SQL na linha de comando, é possível emitir a mesma instrução SQL de maneira programática usando um driver do PostgreSQL.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, album_title
         FROM albums"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryData {
  static void queryData(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery("SELECT singer_id, album_id, album_title FROM albums")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("album_title"));
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryData(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, album_title "+
		"FROM albums")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId, albumId int64
		var title string
		err = rows.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, title)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryData(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query("SELECT singer_id, album_id, album_title " +
      "FROM albums");
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, album_title "
                        "FROM albums")
            for album in cur:
                print(album)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataSample
{
    public static void QueryData(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, album_title FROM albums", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader.GetInt64(0)} {reader.GetInt64(1)} {reader.GetString(2)}");
        }
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./query_data.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar query example-db

Go

go run sample_runner.go query example-db

Node.js

npm start query example-db

Python

python query_data.py example-db

C#

dotnet run query example-db

Você verá o seguinte resultado:

1 1 Total Junk
1 2 Go, Go, Go
2 1 Green
2 2 Forever Hold Your Peace
2 3 Terrified

Consulta usando um parâmetro SQL

Se seu aplicativo tiver uma consulta executada com frequência, você poderá melhorar seu desempenho ao parametrizar-o. A consulta paramétrica resultante pode ser armazenada em cache e reutilizada, o que reduz os custos de compilação. Para mais informações, consulte Use parâmetros de consulta para acelerar as consultas executadas com frequência.

Confira um exemplo de como usar um parâmetro na cláusula WHERE para consultar registros que contêm um valor específico para LastName.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Create a prepared statement to use a query parameter.
# Using a prepared statement for executing the same SQL string multiple
# times increases the execution speed of the statement.
psql -c "PREPARE select_singer AS
         SELECT singer_id, first_name, last_name
         FROM singers
         WHERE last_name = \$1" \
     -c "EXECUTE select_singer ('Garcia')"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithParameter {
  static void queryDataWithParameter(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (PreparedStatement statement =
          connection.prepareStatement(
              "SELECT singer_id, first_name, last_name "
                  + "FROM singers "
                  + "WHERE last_name = ?")) {
        statement.setString(1, "Garcia");
        try (ResultSet resultSet = statement.executeQuery()) {
          while (resultSet.next()) {
            System.out.printf(
                "%d %s %s\n",
                resultSet.getLong("singer_id"),
                resultSet.getString("first_name"),
                resultSet.getString("last_name"));
          }
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryDataWithParameter(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx,
		"SELECT singer_id, first_name, last_name "+
			"FROM singers "+
			"WHERE last_name = $1", "Garcia")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId int64
		var firstName, lastName string
		err = rows.Scan(&singerId, &firstName, &lastName)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryWithParameter(host: string, port: number, database: string): Promise<void> {
  // Connect to Spanner through PGAdapter.
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, first_name, last_name " +
      "FROM singers " +
      "WHERE last_name = $1", ["Garcia"]);
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data_with_parameter(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, first_name, last_name "
                        "FROM singers "
                        "WHERE last_name = %s", ("Garcia",))
            for singer in cur:
                print(singer)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithParameterSample
{
    public static void QueryDataWithParameter(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, first_name, last_name "
                                          + "FROM singers "
                                          + "WHERE last_name = $1", connection);
        cmd.Parameters.Add(new NpgsqlParameter { Value = "Garcia" });
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
        }
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./query_data_with_parameter.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar querywithparameter example-db

Go

go run sample_runner.go querywithparameter example-db

Node.js

npm start querywithparameter example-db

Python

python query_data_with_parameter.py example-db

C#

dotnet run querywithparameter example-db

Você verá o seguinte resultado:

12 Melissa Garcia

Atualizar o esquema do banco de dados

Suponha que você precise adicionar uma nova coluna denominada MarketingBudget à tabela Albums. Para isso, é necessário atualizar seu esquema de banco de dados. O Spanner oferece suporte a atualizações de esquema em um banco de dados enquanto esse banco continua a veicular o tráfego. As atualizações de esquema não exigem banco de dados off-line e não bloqueiam tabelas ou colunas inteiras. você pode continuar gravação de dados no banco de dados durante a atualização do esquema. Leia mais sobre os recursos e o desempenho das alterações de esquema Faça atualizações de esquema.

Adicionar uma coluna

É possível adicionar uma coluna na linha de comando usando a Google Cloud CLI ou programaticamente usando e usar um driver PostgreSQL.

Na linha de comando

Use o seguinte comando ALTER TABLE para adicionar a nova coluna à tabela:

gcloud spanner databases ddl update example-db --instance=test-instance \
    --ddl='ALTER TABLE albums ADD COLUMN marketing_budget BIGINT'

Você verá:

Schema updating...done.

Usar um driver do PostgreSQL

Execute a instrução DDL usando um driver do PostgreSQL para modificar o esquema:

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "ALTER TABLE albums ADD COLUMN marketing_budget bigint"
echo "Added marketing_budget column"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

class AddColumn {
  static void addColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      connection.createStatement().execute("alter table albums add column marketing_budget bigint");
      System.out.println("Added marketing_budget column");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func AddColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	_, err = conn.Exec(ctx,
		"ALTER TABLE albums "+
			"ADD COLUMN marketing_budget bigint")
	if err != nil {
		return err
	}
	fmt.Println("Added marketing_budget column")

	return nil
}

Node.js

import { Client } from 'pg';

async function addColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  await connection.query(
      "ALTER TABLE albums " +
      "ADD COLUMN marketing_budget bigint");
  console.log("Added marketing_budget column");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def add_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("ALTER TABLE albums "
                        "ADD COLUMN marketing_budget bigint")
            print("Added marketing_budget column")

C#

using Npgsql;

namespace dotnet_snippets;

public static class AddColumnSample
{
    public static void AddColumn(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = connection.CreateCommand();
        cmd.CommandText = "alter table albums add column marketing_budget bigint";
        cmd.ExecuteNonQuery();
        Console.WriteLine("Added marketing_budget column");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./add_column.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar addmarketingbudget example-db

Go

go run sample_runner.go addmarketingbudget example-db

Node.js

npm start addmarketingbudget example-db

Python

python add_column.py example-db

C#

dotnet run addmarketingbudget example-db

Você verá:

Added marketing_budget column

Executar um lote de DDL

É recomendável executar várias modificações de esquema em um lote. É possível executar várias instruções DDL em um lote usando o de lote do driver do PostgreSQL enviando todos os arquivos DDL como uma string SQL separada por ponto e vírgula ou usando o prefixo instruções START BATCH DDL e RUN BATCH.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Use a single SQL command to batch multiple statements together.
# Executing multiple DDL statements as one batch is more efficient
# than executing each statement individually.
# Separate the statements with semicolons.
psql << SQL

CREATE TABLE venues (
  venue_id    bigint not null primary key,
  name        varchar(1024),
  description jsonb
);

CREATE TABLE concerts (
  concert_id bigint not null primary key ,
  venue_id   bigint not null,
  singer_id  bigint not null,
  start_time timestamptz,
  end_time   timestamptz,
  constraint fk_concerts_venues foreign key
    (venue_id) references venues (venue_id),
  constraint fk_concerts_singers foreign key
    (singer_id) references singers (singer_id)
);

SQL

echo "Added venues and concerts tables"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

class DdlBatch {
  static void ddlBatch(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (Statement statement = connection.createStatement()) {
        // Create two new tables in one batch.
        statement.addBatch(
            "CREATE TABLE venues ("
                + "  venue_id    bigint not null primary key,"
                + "  name        varchar(1024),"
                + "  description jsonb"
                + ")");
        statement.addBatch(
            "CREATE TABLE concerts ("
                + "  concert_id bigint not null primary key ,"
                + "  venue_id   bigint not null,"
                + "  singer_id  bigint not null,"
                + "  start_time timestamptz,"
                + "  end_time   timestamptz,"
                + "  constraint fk_concerts_venues foreign key"
                + "    (venue_id) references venues (venue_id),"
                + "  constraint fk_concerts_singers foreign key"
                + "    (singer_id) references singers (singer_id)"
                + ")");
        statement.executeBatch();
      }
      System.out.println("Added venues and concerts tables");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func DdlBatch(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Executing multiple DDL statements as one batch is
	// more efficient than executing each statement
	// individually.
	br := conn.SendBatch(ctx, &pgx.Batch{QueuedQueries: []*pgx.QueuedQuery{
		{SQL: "CREATE TABLE venues (" +
			"  venue_id    bigint not null primary key," +
			"  name        varchar(1024)," +
			"  description jsonb" +
			")"},
		{SQL: "CREATE TABLE concerts (" +
			"  concert_id bigint not null primary key ," +
			"  venue_id   bigint not null," +
			"  singer_id  bigint not null," +
			"  start_time timestamptz," +
			"  end_time   timestamptz," +
			"  constraint fk_concerts_venues foreign key" +
			"    (venue_id) references venues (venue_id)," +
			"  constraint fk_concerts_singers foreign key" +
			"    (singer_id) references singers (singer_id)" +
			")"},
	}})
	if _, err := br.Exec(); err != nil {
		return err
	}
	if err := br.Close(); err != nil {
		return err
	}
	fmt.Println("Added venues and concerts tables")

	return nil
}

Node.js

import { Client } from 'pg';

async function ddlBatch(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Executing multiple DDL statements as one batch is
  // more efficient than executing each statement
  // individually.
  await connection.query("start batch ddl");
  await connection.query("CREATE TABLE venues (" +
      "  venue_id    bigint not null primary key," +
      "  name        varchar(1024)," +
      "  description jsonb" +
      ")");
  await connection.query("CREATE TABLE concerts (" +
      "  concert_id bigint not null primary key ," +
      "  venue_id   bigint not null," +
      "  singer_id  bigint not null," +
      "  start_time timestamptz," +
      "  end_time   timestamptz," +
      "  constraint fk_concerts_venues foreign key" +
      "    (venue_id) references venues (venue_id)," +
      "  constraint fk_concerts_singers foreign key" +
      "    (singer_id) references singers (singer_id)" +
      ")");
  await connection.query("run batch");
  console.log("Added venues and concerts tables");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def ddl_batch(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # DDL can only be executed when autocommit=True.
        conn.autocommit = True
        # Use a pipeline to batch multiple statements together.
        # Executing multiple DDL statements as one batch is
        # more efficient than executing each statement
        # individually.
        with conn.pipeline():
            # The following statements are buffered on PGAdapter
            # until the pipeline ends.
            conn.execute("CREATE TABLE venues ("
                         "  venue_id    bigint not null primary key,"
                         "  name        varchar(1024),"
                         "  description jsonb"
                         ")")
            conn.execute("CREATE TABLE concerts ("
                         "  concert_id bigint not null primary key ,"
                         "  venue_id   bigint not null,"
                         "  singer_id  bigint not null,"
                         "  start_time timestamptz,"
                         "  end_time   timestamptz,"
                         "  constraint fk_concerts_venues foreign key"
                         "    (venue_id) references venues (venue_id),"
                         "  constraint fk_concerts_singers foreign key"
                         "    (singer_id) references singers (singer_id)"
                         ")")
        print("Added venues and concerts tables")

C#

using Npgsql;

namespace dotnet_snippets;

public static class DdlBatchSample
{
    public static void DdlBatch(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Create two new tables in one batch.
        var batch = connection.CreateBatch();
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE venues ("
            + "  venue_id    bigint not null primary key,"
            + "  name        varchar(1024),"
            + "  description jsonb"
            + ")"));
        batch.BatchCommands.Add(new NpgsqlBatchCommand(
            "CREATE TABLE concerts ("
            + "  concert_id bigint not null primary key ,"
            + "  venue_id   bigint not null,"
            + "  singer_id  bigint not null,"
            + "  start_time timestamptz,"
            + "  end_time   timestamptz,"
            + "  constraint fk_concerts_venues foreign key"
            + "    (venue_id) references venues (venue_id),"
            + "  constraint fk_concerts_singers foreign key"
            + "    (singer_id) references singers (singer_id)"
            + ")"));
        batch.ExecuteNonQuery();
        Console.WriteLine("Added venues and concerts tables");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./ddl_batch.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar ddlbatch example-db

Go

go run sample_runner.go ddlbatch example-db

Node.js

npm start ddlbatch example-db

Python

python ddl_batch.py example-db

C#

dotnet run ddlbatch example-db

Você verá:

Added venues and concerts tables

Gravar dados na nova coluna

O código a seguir grava dados na coluna nova. Ele define MarketingBudget como 100000 para a linha indexada por Albums(1, 1) e como 500000 para a linha indexada por Albums(2, 2).

O PGAdapter traduz o comando COPY do PostgreSQL em mutações. Por padrão, os comandos COPY são convertidos em mutações Insert. Execute set spanner.copy_upsert=true para traduzir comandos COPY para InsertOrUpdate. Isso pode ser usado para atualizar dados existentes no Spanner.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Instruct PGAdapter to use insert-or-update for COPY statements.
# This enables us to use COPY to update data.
psql -c "set spanner.copy_upsert=true" \
     -c "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN
         WITH (DELIMITER ';')" \
<< DATA
1;1;100000
2;2;500000
DATA

echo "Copied albums using upsert"

Java

import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;

class UpdateDataWithCopy {

  static void updateDataWithCopy(String host, int port, String database)
      throws SQLException, IOException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Unwrap the PostgreSQL JDBC connection interface to get access to
      // a CopyManager.
      PGConnection pgConnection = connection.unwrap(PGConnection.class);
      CopyManager copyManager = pgConnection.getCopyAPI();

      // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
      // will succeed even if it exceeds Spanner's mutation limit per transaction.
      connection
          .createStatement()
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

      // Instruct PGAdapter to use insert-or-update for COPY statements.
      // This enables us to use COPY to update existing data.
      connection.createStatement().execute("set spanner.copy_upsert=true");

      // COPY uses mutations to insert or update existing data in Spanner.
      long numAlbums =
          copyManager.copyIn(
              "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN",
              new StringReader("1\t1\t100000\n" + "2\t2\t500000\n"));
      System.out.printf("Updated %d albums\n", numAlbums);
    }
  }
}

Go

import (
	"context"
	"fmt"
	"io"

	"github.com/jackc/pgx/v5"
)

func UpdateDataWithCopy(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Enable non-atomic mode. This makes the COPY operation non-atomic,
	// and allows it to exceed the Spanner mutation limit.
	if _, err := conn.Exec(ctx,
		"set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
		return err
	}
	// Instruct PGAdapter to use insert-or-update for COPY statements.
	// This enables us to use COPY to update data.
	if _, err := conn.Exec(ctx, "set spanner.copy_upsert=true"); err != nil {
		return err
	}

	// Create a pipe that can be used to write the data manually that we want to copy.
	reader, writer := io.Pipe()
	// Write the data to the pipe using a separate goroutine. This allows us to stream the data
	// to the COPY operation row-by-row.
	go func() error {
		for _, record := range []string{"1\t1\t100000\n", "2\t2\t500000\n"} {
			if _, err := writer.Write([]byte(record)); err != nil {
				return err
			}
		}
		if err := writer.Close(); err != nil {
			return err
		}
		return nil
	}()
	tag, err := conn.PgConn().CopyFrom(ctx, reader, "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN")
	if err != nil {
		return err
	}
	fmt.Printf("Updated %v albums\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';
import { pipeline } from 'node:stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import {Readable} from "stream";

async function updateDataWithCopy(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
  // will succeed even if it exceeds Spanner's mutation limit per transaction.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

  // Instruct PGAdapter to use insert-or-update for COPY statements.
  // This enables us to use COPY to update existing data.
  await connection.query("set spanner.copy_upsert=true");

  // Copy data to Spanner using the COPY command.
  const copyStream = copyFrom('COPY albums (singer_id, album_id, marketing_budget) FROM STDIN');
  const ingestStream = connection.query(copyStream);

  // Create a source stream and attach the source to the destination.
  const sourceStream = new Readable();
  const operation = pipeline(sourceStream, ingestStream);
  // Manually push data to the source stream to write data to Spanner.
  sourceStream.push("1\t1\t100000\n");
  sourceStream.push("2\t2\t500000\n");
  // Push a 'null' to indicate the end of the stream.
  sourceStream.push(null);
  // Wait for the copy operation to finish.
  await operation;
  console.log(`Updated ${copyStream.rowCount} albums`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def update_data_with_copy(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            # Instruct PGAdapter to use insert-or-update for COPY statements.
            # This enables us to use COPY to update data.
            cur.execute("set spanner.copy_upsert=true")

            # COPY uses mutations to insert or update existing data in Spanner.
            with cur.copy("COPY albums (singer_id, album_id, marketing_budget) "
                          "FROM STDIN") as copy:
                copy.write_row((1, 1, 100000))
                copy.write_row((2, 2, 500000))
            print("Updated %d albums" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class UpdateDataWithCopySample
{
    public static void UpdateDataWithCopy(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Enable 'partitioned_non_atomic' mode. This ensures that the COPY operation
        // will succeed even if it exceeds Spanner's mutation limit per transaction.
        using var cmd = connection.CreateCommand();
        cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
        cmd.ExecuteNonQuery();

        // Instruct PGAdapter to use insert-or-update for COPY statements.
        // This enables us to use COPY to update existing data.
        cmd.CommandText = "set spanner.copy_upsert=true";
        cmd.ExecuteNonQuery();

        // COPY uses mutations to insert or update existing data in Spanner.
        using (var albumWriter = connection.BeginTextImport(
                   "COPY albums (singer_id, album_id, marketing_budget) FROM STDIN"))
        {
            albumWriter.WriteLine("1\t1\t100000");
            albumWriter.WriteLine("2\t2\t500000");
        }
        Console.WriteLine($"Updated 2 albums");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./update_data_with_copy.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar update example-db

Go

go run sample_runner.go update example-db

Node.js

npm start update example-db

Python

python update_data_with_copy.py example-db

C#

dotnet run update example-db

Você verá:

Updated 2 albums

Você também pode executar uma consulta SQL para buscar os valores que acabou de gravar.

Veja a seguir o código para executar a consulta:

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql -c "SELECT singer_id, album_id, marketing_budget
         FROM albums
         ORDER BY singer_id, album_id"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class QueryDataWithNewColumn {
  static void queryDataWithNewColumn(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery(
                  "SELECT singer_id, album_id, marketing_budget "
                      + "FROM albums "
                      + "ORDER BY singer_id, album_id")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("marketing_budget"));
        }
      }
    }
  }
}

Go

import (
	"context"
	"database/sql"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func QueryDataWithNewColumn(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	rows, err := conn.Query(ctx, "SELECT singer_id, album_id, marketing_budget "+
		"FROM albums "+
		"ORDER BY singer_id, album_id")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId, albumId int64
		var marketingBudget sql.NullString
		err = rows.Scan(&singerId, &albumId, &marketingBudget)
		if err != nil {
			return err
		}
		var budget string
		if marketingBudget.Valid {
			budget = marketingBudget.String
		} else {
			budget = "NULL"
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, budget)
	}

	return rows.Err()
}

Node.js

import { Client } from 'pg';

async function queryDataWithNewColumn(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  const result = await connection.query(
      "SELECT singer_id, album_id, marketing_budget "
      + "FROM albums "
      + "ORDER BY singer_id, album_id"
  );
  for (const row of result.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["marketing_budget"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def query_data_with_new_column(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            cur.execute("SELECT singer_id, album_id, marketing_budget "
                        "FROM albums "
                        "ORDER BY singer_id, album_id")
            for album in cur:
                print(album)

C#

using Npgsql;

namespace dotnet_snippets;

public static class QueryDataWithNewColumnSample
{
    public static void QueryWithNewColumnData(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = new NpgsqlCommand("SELECT singer_id, album_id, marketing_budget "
                                          + "FROM albums "
                                          + "ORDER BY singer_id, album_id", connection);
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["marketing_budget"]}");
        }
    }
}

Execute a consulta com este comando:

psql

PGDATABASE=example-db ./query_data_with_new_column.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar querymarketingbudget example-db

Go

go run sample_runner.go querymarketingbudget example-db

Node.js

npm start querymarketingbudget example-db

Python

python query_data_with_new_column.py example-db

C#

dotnet run querymarketingbudget example-db

Você verá:

1 1 100000
1 2 null
2 1 null
2 2 500000
2 3 null

Atualizar dados

É possível atualizar dados usando DML em uma transação de leitura/gravação.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Transfer marketing budget from one album to another.
  -- We do it in a transaction to ensure that the transfer is atomic.
  -- Begin a read/write transaction.
  begin;

  -- Increase the marketing budget of album 1 if album 2 has enough budget.
  -- The condition that album 2 has enough budget is guaranteed for the
  -- duration of the transaction, as read/write transactions in Spanner use
  -- external consistency as the default isolation level.
  update albums set
    marketing_budget = marketing_budget + 200000
  where singer_id = 1
    and  album_id = 1
    and exists (
      select album_id
      from albums
      where singer_id = 2
        and  album_id = 2
        and marketing_budget > 200000
      );

  -- Decrease the marketing budget of album 2.      
  update albums set
    marketing_budget = marketing_budget - 200000
  where singer_id = 2
    and  album_id = 2
    and marketing_budget > 200000;

  -- Commit the transaction to make the changes to both marketing budgets
  -- durably stored in the database and visible to other transactions.
  commit;  
SQL

echo "Transferred marketing budget from Album 2 to Album 1"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class UpdateDataWithTransaction {

  static void writeWithTransactionUsingDml(String host, int port, String database)
      throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      connection.setAutoCommit(false);

      // Transfer marketing budget from one album to another. We do it in a
      // transaction to ensure that the transfer is atomic. There is no need
      // to explicitly start the transaction. The first statement on the
      // connection will start a transaction when AutoCommit=false.
      String selectMarketingBudgetSql =
          "SELECT marketing_budget from albums WHERE singer_id = ? and album_id = ?";
      long album2Budget = 0;
      try (PreparedStatement selectMarketingBudgetStatement =
          connection.prepareStatement(selectMarketingBudgetSql)) {
        // Bind the query parameters to SingerId=2 and AlbumId=2.
        selectMarketingBudgetStatement.setLong(1, 2);
        selectMarketingBudgetStatement.setLong(2, 2);
        try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
          while (resultSet.next()) {
            album2Budget = resultSet.getLong("marketing_budget");
          }
        }
        // The transaction will only be committed if this condition still holds
        // at the time of commit. Otherwise, the transaction will be aborted.
        final long transfer = 200000;
        if (album2Budget >= transfer) {
          long album1Budget = 0;
          // Re-use the existing PreparedStatement for selecting the
          // marketing_budget to get the budget for Album 1.
          // Bind the query parameters to SingerId=1 and AlbumId=1.
          selectMarketingBudgetStatement.setLong(1, 1);
          selectMarketingBudgetStatement.setLong(2, 1);
          try (ResultSet resultSet = selectMarketingBudgetStatement.executeQuery()) {
            while (resultSet.next()) {
              album1Budget = resultSet.getLong("marketing_budget");
            }
          }

          // Transfer part of the marketing budget of Album 2 to Album 1.
          album1Budget += transfer;
          album2Budget -= transfer;
          String updateSql =
              "UPDATE albums "
                  + "SET marketing_budget = ? "
                  + "WHERE singer_id = ? and album_id = ?";
          try (PreparedStatement updateStatement = connection.prepareStatement(updateSql)) {
            // Update Album 1.
            int paramIndex = 0;
            updateStatement.setLong(++paramIndex, album1Budget);
            updateStatement.setLong(++paramIndex, 1);
            updateStatement.setLong(++paramIndex, 1);
            // Create a DML batch by calling addBatch
            // on the current PreparedStatement.
            updateStatement.addBatch();

            // Update Album 2 in the same DML batch.
            paramIndex = 0;
            updateStatement.setLong(++paramIndex, album2Budget);
            updateStatement.setLong(++paramIndex, 2);
            updateStatement.setLong(++paramIndex, 2);
            updateStatement.addBatch();

            // Execute both DML statements in one batch.
            updateStatement.executeBatch();
          }
        }
      }
      // Commit the current transaction.
      connection.commit();
      System.out.println("Transferred marketing budget from Album 2 to Album 1");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func WriteWithTransactionUsingDml(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Transfer marketing budget from one album to another. We do it in a
	// transaction to ensure that the transfer is atomic.
	tx, err := conn.Begin(ctx)
	if err != nil {
		return err
	}
	const selectSql = "SELECT marketing_budget " +
		"from albums " +
		"WHERE singer_id = $1 and album_id = $2"
	// Get the marketing_budget of singer 2 / album 2.
	row := tx.QueryRow(ctx, selectSql, 2, 2)
	var budget2 int64
	if err := row.Scan(&budget2); err != nil {
		tx.Rollback(ctx)
		return err
	}
	const transfer = 20000
	// The transaction will only be committed if this condition still holds
	// at the time of commit. Otherwise, the transaction will be aborted.
	if budget2 >= transfer {
		// Get the marketing_budget of singer 1 / album 1.
		row := tx.QueryRow(ctx, selectSql, 1, 1)
		var budget1 int64
		if err := row.Scan(&budget1); err != nil {
			tx.Rollback(ctx)
			return err
		}
		// Transfer part of the marketing budget of Album 2 to Album 1.
		budget1 += transfer
		budget2 -= transfer
		const updateSql = "UPDATE albums " +
			"SET marketing_budget = $1 " +
			"WHERE singer_id = $2 and album_id = $3"
		// Start a DML batch and execute it as part of the current transaction.
		batch := &pgx.Batch{}
		batch.Queue(updateSql, budget1, 1, 1)
		batch.Queue(updateSql, budget2, 2, 2)
		br := tx.SendBatch(ctx, batch)
		_, err = br.Exec()
		if err := br.Close(); err != nil {
			tx.Rollback(ctx)
			return err
		}
	}
	// Commit the current transaction.
	tx.Commit(ctx)
	fmt.Println("Transferred marketing budget from Album 2 to Album 1")

	return nil
}

Node.js

import { Client } from 'pg';

async function writeWithTransactionUsingDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Transfer marketing budget from one album to another. We do it in a
  // transaction to ensure that the transfer is atomic. node-postgres
  // requires you to explicitly start the transaction by executing 'begin'.
  await connection.query("begin");
  const selectMarketingBudgetSql = "SELECT marketing_budget " +
      "from albums " +
      "WHERE singer_id = $1 and album_id = $2";
  // Get the marketing_budget of singer 2 / album 2.
  const album2BudgetResult = await connection.query(selectMarketingBudgetSql, [2, 2]);
  let album2Budget = album2BudgetResult.rows[0]["marketing_budget"];
  const transfer = 200000;
  // The transaction will only be committed if this condition still holds
  // at the time of commit. Otherwise, the transaction will be aborted.
  if (album2Budget >= transfer) {
    // Get the marketing budget of singer 1 / album 1.
    const album1BudgetResult = await connection.query(selectMarketingBudgetSql, [1, 1]);
    let album1Budget = album1BudgetResult.rows[0]["marketing_budget"];
    // Transfer part of the marketing budget of Album 2 to Album 1.
    album1Budget += transfer;
    album2Budget -= transfer;
    const updateSql = "UPDATE albums " +
        "SET marketing_budget = $1 " +
        "WHERE singer_id = $2 and album_id = $3";
    // Start a DML batch. This batch will become part of the current transaction.
    // TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
    // await connection.query("start batch dml");
    // Update the marketing budget of both albums.
    await connection.query(updateSql, [album1Budget, 1, 1]);
    await connection.query(updateSql, [album2Budget, 2, 2]);
    // TODO: Enable when https://github.com/googleapis/java-spanner/pull/3114 has been merged
    // await connection.query("run batch");
  }
  // Commit the current transaction.
  await connection.query("commit");
  console.log("Transferred marketing budget from Album 2 to Album 1");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def update_data_with_transaction(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # Set autocommit=False to use transactions.
        # The first statement that is executed starts the transaction.
        conn.autocommit = False
        with conn.cursor() as cur:
            # Transfer marketing budget from one album to another.
            # We do it in a transaction to ensure that the transfer is atomic.
            # There is no need to explicitly start the transaction. The first
            # statement on the connection will start a transaction when
            # AutoCommit=false.
            select_marketing_budget_sql = ("SELECT marketing_budget "
                                           "from albums "
                                           "WHERE singer_id = %s "
                                           "and album_id = %s")
            # Get the marketing budget of Album #2.
            cur.execute(select_marketing_budget_sql, (2, 2))
            album2_budget = cur.fetchone()[0]
            transfer = 200000
            if album2_budget > transfer:
                # Get the marketing budget of Album #1.
                cur.execute(select_marketing_budget_sql, (1, 1))
                album1_budget = cur.fetchone()[0]
                # Transfer the marketing budgets and write the update back
                # to the database.
                album1_budget += transfer
                album2_budget -= transfer
                update_sql = ("update albums "
                              "set marketing_budget = %s "
                              "where singer_id = %s "
                              "and   album_id = %s")
                # Use a pipeline to execute two DML statements in one batch.
                with conn.pipeline():
                    cur.execute(update_sql, (album1_budget, 1, 1,))
                    cur.execute(update_sql, (album2_budget, 2, 2,))
            else:
                print("Insufficient budget to transfer")
        # Commit the transaction.
        conn.commit()
        print("Transferred marketing budget from Album 2 to Album 1")

C#

using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class TagsSample
{
    public static void Tags(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Start a transaction with isolation level Serializable.
        // Spanner only supports this isolation level. Trying to use a lower
        // isolation level (including the default isolation level READ COMMITTED),
        // will result in an error.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);

        // Create a command that uses the current transaction.
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;

        // Set the TRANSACTION_TAG session variable to set a transaction tag
        // for the current transaction.
        cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
        cmd.ExecuteNonQuery();

        // Set the STATEMENT_TAG session variable to set the request tag
        // that should be included with the next SQL statement.
        cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
        cmd.ExecuteNonQuery();

        // Get the marketing_budget of Album (1,1).
        cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        var marketingBudget = (long?)cmd.ExecuteScalar();

        // Reduce the marketing budget by 10% if it is more than 1,000.
        if (marketingBudget > 1000L)
        {
            marketingBudget -= (long) (marketingBudget * 0.1);

            // Set the statement tag to use for the update statement.
            cmd.Parameters.Clear();
            cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
            cmd.ExecuteNonQuery();

            cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
            cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.ExecuteNonQuery();
        }

        // Commit the current transaction.
        transaction.Commit();
        Console.WriteLine("Reduced marketing budget");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./update_data_with_transaction.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar writewithtransactionusingdml example-db

Go

go run sample_runner.go writewithtransactionusingdml example-db

Node.js

npm start writewithtransactionusingdml example-db

Python

python update_data_with_transaction.py example-db

C#

dotnet run writewithtransactionusingdml example-db

Você verá:

Transferred marketing budget from Album 2 to Album 1

Tags de transação e de solicitação

Use tags de transação e de solicitação para resolver problemas com transações e consultas no Spanner. É possível definir tags de transação e tags de solicitação com as variáveis de sessão SPANNER.TRANSACTION_TAG e SPANNER.STATEMENT_TAG.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Start a transaction.
  begin;
  -- Set the TRANSACTION_TAG session variable to set a transaction tag
  -- for the current transaction. This can only be executed at the start
  -- of the transaction.
  set spanner.transaction_TAG='example-tx-tag';

  -- Set the STATEMENT_TAG session variable to set the request tag
  -- that should be included with the next SQL statement.
  set spanner.statement_tag='query-marketing-budget';

  select marketing_budget
  from albums
  where singer_id = 1
    and album_id  = 1;

  -- Reduce the marketing budget by 10% if it is more than 1,000.
  -- Set a statement tag for the update statement.
  set spanner.statement_tag='reduce-marketing-budget';

  update albums
    set marketing_budget = marketing_budget - (marketing_budget * 0.1)::bigint
  where singer_id = 1
    and album_id  = 1
    and marketing_budget > 1000;

  commit;  
SQL

echo "Reduced marketing budget"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

class Tags {

  static void tags(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      connection.setAutoCommit(false);
      // Set the TRANSACTION_TAG session variable to set a transaction tag
      // for the current transaction.
      connection.createStatement().execute("set spanner.transaction_tag='example-tx-tag'");

      // Set the STATEMENT_TAG session variable to set the request tag
      // that should be included with the next SQL statement.
      connection.createStatement().execute("set spanner.statement_tag='query-marketing-budget'");
      long marketingBudget = 0L;
      long singerId = 1L;
      long albumId = 1L;
      try (PreparedStatement statement =
          connection.prepareStatement(
              "select marketing_budget from albums where singer_id=? and album_id=?")) {
        statement.setLong(1, singerId);
        statement.setLong(2, albumId);
        try (ResultSet albumResultSet = statement.executeQuery()) {
          while (albumResultSet.next()) {
            marketingBudget = albumResultSet.getLong(1);
          }
        }
      }
      // Reduce the marketing budget by 10% if it is more than 1,000.
      final long maxMarketingBudget = 1000L;
      final float reduction = 0.1f;
      if (marketingBudget > maxMarketingBudget) {
        marketingBudget -= (long) (marketingBudget * reduction);
        connection.createStatement().execute("set spanner.statement_tag='reduce-marketing-budget'");
        try (PreparedStatement statement =
            connection.prepareStatement(
                "update albums set marketing_budget=? where singer_id=? AND album_id=?")) {
          int paramIndex = 0;
          statement.setLong(++paramIndex, marketingBudget);
          statement.setLong(++paramIndex, singerId);
          statement.setLong(++paramIndex, albumId);
          statement.executeUpdate();
        }
      }

      // Commit the current transaction.
      connection.commit();
      System.out.println("Reduced marketing budget");
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func Tags(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	tx, err := conn.Begin(ctx)
	if err != nil {
		return err
	}

	// Set the TRANSACTION_TAG session variable to set a transaction tag
	// for the current transaction.
	_, _ = tx.Exec(ctx, "set spanner.transaction_tag='example-tx-tag'")

	// Set the STATEMENT_TAG session variable to set the request tag
	// that should be included with the next SQL statement.
	_, _ = tx.Exec(ctx, "set spanner.statement_tag='query-marketing-budget'")

	row := tx.QueryRow(ctx, "select marketing_budget "+
		"from albums "+
		"where singer_id=$1 and album_id=$2", 1, 1)
	var budget int64
	if err := row.Scan(&budget); err != nil {
		tx.Rollback(ctx)
		return err
	}

	// Reduce the marketing budget by 10% if it is more than 1,000.
	if budget > 1000 {
		budget = int64(float64(budget) - float64(budget)*0.1)
		_, _ = tx.Exec(ctx, "set spanner.statement_tag='reduce-marketing-budget'")
		if _, err := tx.Exec(ctx, "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3", budget, 1, 1); err != nil {
			tx.Rollback(ctx)
			return err
		}
	}
	// Commit the current transaction.
	tx.Commit(ctx)
	fmt.Println("Reduced marketing budget")

	return nil
}

Node.js

import { Client } from 'pg';

async function tags(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  await connection.query("begin");
  // Set the TRANSACTION_TAG session variable to set a transaction tag
  // for the current transaction.
  await connection.query("set spanner.transaction_tag='example-tx-tag'");
  // Set the STATEMENT_TAG session variable to set the request tag
  // that should be included with the next SQL statement.
  await connection.query("set spanner.statement_tag='query-marketing-budget'");
  const budgetResult = await connection.query(
      "select marketing_budget " +
      "from albums " +
      "where singer_id=$1 and album_id=$2", [1, 1])
  let budget = budgetResult.rows[0]["marketing_budget"];
  // Reduce the marketing budget by 10% if it is more than 1,000.
  if (budget > 1000) {
    budget = budget - budget * 0.1;
    await connection.query("set spanner.statement_tag='reduce-marketing-budget'");
    await connection.query("update albums set marketing_budget=$1 "
        + "where singer_id=$2 AND album_id=$3", [budget, 1, 1]);
  }
  // Commit the current transaction.
  await connection.query("commit");
  console.log("Reduced marketing budget");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def tags(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        # Set autocommit=False to enable transactions.
        conn.autocommit = False
        with conn.cursor() as cur:
            # Set the TRANSACTION_TAG session variable to set a transaction tag
            # for the current transaction.
            cur.execute("set spanner.transaction_TAG='example-tx-tag'")

            # Set the STATEMENT_TAG session variable to set the request tag
            # that should be included with the next SQL statement.
            cur.execute("set spanner.statement_tag='query-marketing-budget'")

            singer_id = 1
            album_id = 1
            cur.execute("select marketing_budget "
                        "from albums "
                        "where singer_id = %s "
                        "  and album_id  = %s",
                        (singer_id, album_id,))
            marketing_budget = cur.fetchone()[0]

            # Reduce the marketing budget by 10% if it is more than 1,000.
            max_marketing_budget = 1000
            reduction = 0.1
            if marketing_budget > max_marketing_budget:
                # Make sure the marketing_budget remains an int.
                marketing_budget -= int(marketing_budget * reduction)
                # Set a statement tag for the update statement.
                cur.execute(
                    "set spanner.statement_tag='reduce-marketing-budget'")
                cur.execute("update albums set marketing_budget = %s "
                            "where singer_id = %s "
                            "  and album_id  = %s",
                            (marketing_budget, singer_id, album_id,))
            else:
                print("Marketing budget already less than or equal to 1,000")
        # Commit the transaction.
        conn.commit()
        print("Reduced marketing budget")

C#

using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class TagsSample
{
    public static void Tags(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Start a transaction with isolation level Serializable.
        // Spanner only supports this isolation level. Trying to use a lower
        // isolation level (including the default isolation level READ COMMITTED),
        // will result in an error.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);

        // Create a command that uses the current transaction.
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;

        // Set the TRANSACTION_TAG session variable to set a transaction tag
        // for the current transaction.
        cmd.CommandText = "set spanner.transaction_tag='example-tx-tag'";
        cmd.ExecuteNonQuery();

        // Set the STATEMENT_TAG session variable to set the request tag
        // that should be included with the next SQL statement.
        cmd.CommandText = "set spanner.statement_tag='query-marketing-budget'";
        cmd.ExecuteNonQuery();

        // Get the marketing_budget of Album (1,1).
        cmd.CommandText = "select marketing_budget from albums where singer_id=$1 and album_id=$2";
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
        var marketingBudget = (long?)cmd.ExecuteScalar();

        // Reduce the marketing budget by 10% if it is more than 1,000.
        if (marketingBudget > 1000L)
        {
            marketingBudget -= (long) (marketingBudget * 0.1);

            // Set the statement tag to use for the update statement.
            cmd.Parameters.Clear();
            cmd.CommandText = "set spanner.statement_tag='reduce-marketing-budget'";
            cmd.ExecuteNonQuery();

            cmd.CommandText = "update albums set marketing_budget=$1 where singer_id=$2 AND album_id=$3";
            cmd.Parameters.Add(new NpgsqlParameter { Value = marketingBudget });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.Parameters.Add(new NpgsqlParameter { Value = 1L });
            cmd.ExecuteNonQuery();
        }

        // Commit the current transaction.
        transaction.Commit();
        Console.WriteLine("Reduced marketing budget");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./tags.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar tags example-db

Go

go run sample_runner.go tags example-db

Node.js

npm start tags example-db

Python

python tags.py example-db

C#

dotnet run tags example-db

Recuperar dados usando transações somente leitura

Suponha que você queira executar mais de uma leitura no mesmo carimbo de data/hora. As transações somente leitura observam um prefixo consistente do histórico de confirmações da transação. Portanto, o aplicativo sempre recebe dados consistentes. Defina a conexão como somente leitura ou use a instrução SQL SET TRANSACTION READ ONLY para executar uma transação somente leitura.

Veja a seguir como executar uma consulta e fazer uma leitura na mesma transação somente leitura.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

psql << SQL
  -- Begin a transaction.
  begin;
  -- Change the current transaction to a read-only transaction.
  -- This statement can only be executed at the start of a transaction.
  set transaction read only;

  -- The following two queries use the same read-only transaction.
  select singer_id, album_id, album_title
  from albums
  order by singer_id, album_id;

  select singer_id, album_id, album_title
  from albums
  order by album_title;

  -- Read-only transactions must also be committed or rolled back to mark
  -- the end of the transaction. There is no semantic difference between
  -- rolling back or committing a read-only transaction.
  commit;  
SQL

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class ReadOnlyTransaction {
  static void readOnlyTransaction(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Set AutoCommit=false to enable transactions.
      connection.setAutoCommit(false);
      // This SQL statement instructs the JDBC driver to use
      // a read-only transaction.
      connection.createStatement().execute("set transaction read only");

      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery(
                  "SELECT singer_id, album_id, album_title "
                      + "FROM albums "
                      + "ORDER BY singer_id, album_id")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("album_title"));
        }
      }
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery(
                  "SELECT singer_id, album_id, album_title "
                      + "FROM albums "
                      + "ORDER BY album_title")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %d %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getLong("album_id"),
              resultSet.getString("album_title"));
        }
      }
      // End the read-only transaction by calling commit().
      connection.commit();
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func ReadOnlyTransaction(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Start a read-only transaction by supplying additional transaction options.
	tx, err := conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})

	albumsOrderedById, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY singer_id, album_id")
	defer albumsOrderedById.Close()
	if err != nil {
		return err
	}
	for albumsOrderedById.Next() {
		var singerId, albumId int64
		var title string
		err = albumsOrderedById.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, title)
	}

	albumsOrderedTitle, err := tx.Query(ctx, "SELECT singer_id, album_id, album_title FROM albums ORDER BY album_title")
	defer albumsOrderedTitle.Close()
	if err != nil {
		return err
	}
	for albumsOrderedTitle.Next() {
		var singerId, albumId int64
		var title string
		err = albumsOrderedTitle.Scan(&singerId, &albumId, &title)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, albumId, title)
	}

	// End the read-only transaction by calling Commit().
	return tx.Commit(ctx)
}

Node.js

import { Client } from 'pg';

async function readOnlyTransaction(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Start a transaction.
  await connection.query("begin");
  // This SQL statement instructs the PGAdapter to make it a read-only transaction.
  await connection.query("set transaction read only");

  const albumsOrderById = await connection.query(
      "SELECT singer_id, album_id, album_title "
      + "FROM albums "
      + "ORDER BY singer_id, album_id");
  for (const row of albumsOrderById.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  }
  const albumsOrderByTitle = await connection.query(
      "SELECT singer_id, album_id, album_title "
      + "FROM albums "
      + "ORDER BY album_title");
  for (const row of albumsOrderByTitle.rows) {
    console.log(`${row["singer_id"]} ${row["album_id"]} ${row["album_title"]}`);
  }
  // End the read-only transaction by executing commit.
  await connection.query("commit");

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def read_only_transaction(host: string, port: int, database: string):
    with (psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn):
        # Set autocommit=False to enable transactions.
        conn.autocommit = False

        with conn.cursor() as cur:
            # Change the current transaction to a read-only transaction.
            # This statement can only be executed at the start of a transaction.
            cur.execute("set transaction read only")

            # The following two queries use the same read-only transaction.
            cur.execute("select singer_id, album_id, album_title "
                        "from albums "
                        "order by singer_id, album_id")
            for album in cur:
                print(album)

            cur.execute("select singer_id, album_id, album_title "
                        "from albums "
                        "order by album_title")
            for album in cur:
                print(album)

        # Read-only transactions must also be committed or rolled back to mark
        # the end of the transaction. There is no semantic difference between
        # rolling back or committing a read-only transaction.
        conn.commit()

C#

using Npgsql;
using System.Data;

namespace dotnet_snippets;

public static class ReadOnlyTransactionSample
{
    public static void ReadOnlyTransaction(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Start a read-only transaction.
        // You must specify Serializable as the isolation level, as the npgsql driver
        // will otherwise automatically set the isolation level to read-committed.
        var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
        using var cmd = connection.CreateCommand();
        cmd.Transaction = transaction;
        // This SQL statement instructs the npgsql driver to use
        // a read-only transaction.
        cmd.CommandText = "set transaction read only";
        cmd.ExecuteNonQuery();

        cmd.CommandText = "SELECT singer_id, album_id, album_title " +
                          "FROM albums " +
                          "ORDER BY singer_id, album_id";
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
            }
        }
        cmd.CommandText = "SELECT singer_id, album_id, album_title "
                          + "FROM albums "
                          + "ORDER BY album_title";
        using (var reader = cmd.ExecuteReader())
        {
            while (reader.Read())
            {
                Console.WriteLine($"{reader["singer_id"]} {reader["album_id"]} {reader["album_title"]}");
            }
        }
        // End the read-only transaction by calling commit().
        transaction.Commit();
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./read_only_transaction.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar readonlytransaction example-db

Go

go run sample_runner.go readonlytransaction example-db

Node.js

npm start readonlytransaction example-db

Python

python read_only_transaction.py example-db

C#

dotnet run readonlytransaction example-db

Você verá uma saída como:

    1 1 Total Junk
    1 2 Go, Go, Go
    2 1 Green
    2 2 Forever Hold Your Peace
    2 3 Terrified
    2 2 Forever Hold Your Peace
    1 2 Go, Go, Go
    2 1 Green
    2 3 Terrified
    1 1 Total Junk

Consultas particionadas e Data Boost

A API partitionQuery divide uma consulta em partes menores, ou partições, e usa várias máquinas para buscar as partições em paralelo. Cada partição é identificada por um token de partição. A API PartitionQuery tem uma latência maior do que a API de consulta padrão, porque é destinada apenas a operações em massa, como exportar ou procurar todo o banco de dados.

O Data Boost permite executar consultas de análise e exportações de dados com impacto quase zero nas cargas de trabalho atuais na instância provisionada do Spanner. O Data Boost só é compatível com consultas particionadas.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# 'set spanner.data_boost_enabled=true' enables Data Boost for
# all partitioned queries on this connection.

# 'run partitioned query' is a shortcut for partitioning the query
# that follows and executing each of the partitions that is returned
# by Spanner.

psql -c "set spanner.data_boost_enabled=true" \
     -c "run partitioned query
         select singer_id, first_name, last_name
         from singers"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

class DataBoost {
  static void dataBoost(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // This enables Data Boost for all partitioned queries on this connection.
      connection.createStatement().execute("set spanner.data_boost_enabled=true");

      // Run a partitioned query. This query will use Data Boost.
      try (ResultSet resultSet =
          connection
              .createStatement()
              .executeQuery(
                  "run partitioned query "
                      + "select singer_id, first_name, last_name "
                      + "from singers")) {
        while (resultSet.next()) {
          System.out.printf(
              "%d %s %s\n",
              resultSet.getLong("singer_id"),
              resultSet.getString("first_name"),
              resultSet.getString("last_name"));
        }
      }
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func DataBoost(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// This enables Data Boost for all partitioned queries on this connection.
	_, _ = conn.Exec(ctx, "set spanner.data_boost_enabled=true")

	// Run a partitioned query. This query will use Data Boost.
	rows, err := conn.Query(ctx, "run partitioned query select singer_id, first_name, last_name from singers")
	defer rows.Close()
	if err != nil {
		return err
	}
	for rows.Next() {
		var singerId int64
		var firstName, lastName string
		err = rows.Scan(&singerId, &firstName, &lastName)
		if err != nil {
			return err
		}
		fmt.Printf("%v %v %v\n", singerId, firstName, lastName)
	}

	return nil
}

Node.js

import { Client } from 'pg';

async function dataBoost(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // This enables Data Boost for all partitioned queries on this connection.
  await connection.query("set spanner.data_boost_enabled=true");

  // Run a partitioned query. This query will use Data Boost.
  const singers = await connection.query(
      "run partitioned query "
      + "select singer_id, first_name, last_name "
      + "from singers");
  for (const row of singers.rows) {
    console.log(`${row["singer_id"]} ${row["first_name"]} ${row["last_name"]}`);
  }

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def data_boost(host: string, port: int, database: string):
    with (psycopg.connect("host={host} port={port} dbname={database} "
                          "sslmode=disable".format(host=host,
                                                   port=port,
                                                   database=database)) as conn):
        # Set autocommit=True so each query uses a separate transaction.
        conn.autocommit = True

        with conn.cursor() as cur:
            # This enables Data Boost for all partitioned queries on this
            # connection.
            cur.execute("set spanner.data_boost_enabled=true")

            # Run a partitioned query. This query will use Data Boost.
            cur.execute("run partitioned query "
                        "select singer_id, first_name, last_name "
                        "from singers")
            for singer in cur:
                print(singer)

C#

using Npgsql;

namespace dotnet_snippets;

public static class DataBoostSample
{
    public static void DataBoost(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        using var cmd = connection.CreateCommand();
        // This enables Data Boost for all partitioned queries on this connection.
        cmd.CommandText = "set spanner.data_boost_enabled=true";
        cmd.ExecuteNonQuery();


        // Run a partitioned query. This query will use Data Boost.
        cmd.CommandText = "run partitioned query "
                          + "select singer_id, first_name, last_name "
                          + "from singers";
        using var reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            Console.WriteLine($"{reader["singer_id"]} {reader["first_name"]} {reader["last_name"]}");
        }
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./data_boost.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar databoost example-db

Go

go run sample_runner.go databoost example-db

Node.js

npm start databoost example-db

Python

python data_boost.py example-db

C#

dotnet run databoost example-db

Para mais informações sobre como executar consultas particionadas e usar o Data Boost com PGAdapter, consulte: Otimização de dados e instruções de consulta particionada

DML particionada

A linguagem de manipulação de dados (DML) particionada foi projetada para os seguintes tipos de atualizações e exclusões em massa:

  • Limpeza periódica e coleta de lixo.
  • Preenchimento de novas colunas com valores padrão.

psql

#!/bin/bash

export PGHOST="${PGHOST:-localhost}"
export PGPORT="${PGPORT:-5432}"
export PGDATABASE="${PGDATABASE:-example-db}"

# Change the DML mode that is used by this connection to Partitioned
# DML. Partitioned DML is designed for bulk updates and deletes.
# See https://cloud.google.com/spanner/docs/dml-partitioned for more
# information.
psql -c "set spanner.autocommit_dml_mode='partitioned_non_atomic'" \
     -c "update albums
         set marketing_budget=0
         where marketing_budget is null"

echo "Updated albums using Partitioned DML"

Java

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

class PartitionedDml {

  static void partitionedDml(String host, int port, String database) throws SQLException {
    String connectionUrl = String.format("jdbc:postgresql://%s:%d/%s", host, port, database);
    try (Connection connection = DriverManager.getConnection(connectionUrl)) {
      // Enable Partitioned DML on this connection.
      connection
          .createStatement()
          .execute("set spanner.autocommit_dml_mode='partitioned_non_atomic'");
      // Back-fill a default value for the MarketingBudget column.
      long lowerBoundUpdateCount =
          connection
              .createStatement()
              .executeUpdate("update albums set marketing_budget=0 where marketing_budget is null");
      System.out.printf("Updated at least %d albums\n", lowerBoundUpdateCount);
    }
  }
}

Go

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

func PartitionedDML(host string, port int, database string) error {
	ctx := context.Background()
	connString := fmt.Sprintf(
		"postgres://uid:pwd@%s:%d/%s?sslmode=disable",
		host, port, database)
	conn, err := pgx.Connect(ctx, connString)
	if err != nil {
		return err
	}
	defer conn.Close(ctx)

	// Enable Partitioned DML on this connection.
	if _, err := conn.Exec(ctx, "set spanner.autocommit_dml_mode='partitioned_non_atomic'"); err != nil {
		return err
	}
	// Back-fill a default value for the MarketingBudget column.
	tag, err := conn.Exec(ctx, "update albums set marketing_budget=0 where marketing_budget is null")
	if err != nil {
		return err
	}
	fmt.Printf("Updated at least %v albums\n", tag.RowsAffected())

	return nil
}

Node.js

import { Client } from 'pg';

async function partitionedDml(host: string, port: number, database: string): Promise<void> {
  const connection = new Client({
    host: host,
    port: port,
    database: database,
  });
  await connection.connect();

  // Enable Partitioned DML on this connection.
  await connection.query("set spanner.autocommit_dml_mode='partitioned_non_atomic'");

  // Back-fill a default value for the MarketingBudget column.
  const lowerBoundUpdateCount = await connection.query(
      "update albums " +
      "set marketing_budget=0 " +
      "where marketing_budget is null");
  console.log(`Updated at least ${lowerBoundUpdateCount.rowCount} albums`);

  // Close the connection.
  await connection.end();
}

Python

import string
import psycopg


def execute_partitioned_dml(host: string, port: int, database: string):
    with psycopg.connect("host={host} port={port} dbname={database} "
                         "sslmode=disable".format(host=host,
                                                  port=port,
                                                  database=database)) as conn:
        conn.autocommit = True
        with conn.cursor() as cur:
            # Change the DML mode that is used by this connection to Partitioned
            # DML. Partitioned DML is designed for bulk updates and deletes.
            # See https://cloud.google.com/spanner/docs/dml-partitioned for more
            # information.
            cur.execute(
                "set spanner.autocommit_dml_mode='partitioned_non_atomic'")

            # The following statement will use Partitioned DML.
            cur.execute("update albums "
                        "set marketing_budget=0 "
                        "where marketing_budget is null")
            print("Updated at least %d albums" % cur.rowcount)

C#

using Npgsql;

namespace dotnet_snippets;

public static class PartitionedDmlSample
{
    public static void PartitionedDml(string host, int port, string database)
    {
        var connectionString = $"Host={host};Port={port};Database={database};SSL Mode=Disable";
        using var connection = new NpgsqlConnection(connectionString);
        connection.Open();

        // Enable Partitioned DML on this connection.
        using var cmd = connection.CreateCommand();
        cmd.CommandText = "set spanner.autocommit_dml_mode='partitioned_non_atomic'";
        cmd.ExecuteNonQuery();

        // Back-fill a default value for the MarketingBudget column.
        cmd.CommandText = "update albums set marketing_budget=0 where marketing_budget is null";
        var lowerBoundUpdateCount = cmd.ExecuteNonQuery();

        Console.WriteLine($"Updated at least {lowerBoundUpdateCount} albums");
    }
}

Execute o exemplo com o seguinte comando:

psql

PGDATABASE=example-db ./partitioned_dml.sh

Java

java -jar target/pgadapter-snippets/pgadapter-samples.jar partitioneddml example-db

Go

go run sample_runner.go partitioneddml example-db

Node.js

npm start partitioneddml example-db

Python

python partitioned_dml.py example-db

C#

dotnet run datpartitioneddmlboost example-db

Limpeza

Para não gerar cobranças extras na sua conta do Google Cloud pelos recursos usados neste tutorial, suspenda o banco de dados e exclua a instância que você criou.

Excluir o banco de dados

Se você excluir uma instância, todos os bancos de dados nela serão excluídos automaticamente. Nesta etapa, mostramos como excluir um banco de dados sem remover a instância. Ainda pode haver cobrança em relação à instância.

Na linha de comando

gcloud spanner databases delete example-db --instance=test-instance

Use o console do Google Cloud

  1. Acesse a página Instâncias do Spanner no console do Google Cloud.

    Acessar a página "Instâncias"

  2. Clique na instância.

  3. Clique no banco de dados que você quer excluir.

  4. Na página Detalhes do banco de dados, clique em Excluir.

  5. Confirme se quer excluir o banco de dados e clique em Excluir.

Excluir a instância

A exclusão de uma instância descarta automaticamente todos os bancos de dados criados nela.

Na linha de comando

gcloud spanner instances delete test-instance

Use o console do Google Cloud

  1. Acesse a página Instâncias do Spanner no console do Google Cloud.

    Acessar a página "Instâncias"

  2. Clique na sua instância.

  3. Clique em Excluir.

  4. Confirme se quer excluir a instância e clique em Excluir.

A seguir