使用 AlloyDB 語言連接器連線

AlloyDB 語言連接器是程式庫,可簡化建立叢集安全連線的程序,請使用下列項目:

  • 自動建立 mTLS 連線
  • 支援以身分與存取權管理 (IAM) 為基礎的授權
  • 自動化 IAM 驗證

如果沒有網路路徑,AlloyDB 語言連接器就無法提供 AlloyDB 叢集的網路路徑。

如要進一步瞭解 AlloyDB 語言連接器,請參閱 AlloyDB 語言連接器總覽

本頁將探討下列 AlloyDB 語言連接器:

  • AlloyDB Java 連接器
  • AlloyDB Go 連接器
  • AlloyDB Python 連接器

事前準備

  1. 啟用 AlloyDB API。

    啟用 API

  2. 建立 AlloyDB 執行個體,並設定預設使用者。

    如要進一步瞭解如何建立執行個體,請參閱建立主要執行個體

    如要進一步瞭解使用者角色,請參閱「預先定義的 AlloyDB IAM 角色」。

  3. 設定連線至 AlloyDB 執行個體所需的下列角色和權限:

    • roles/alloydb.client
    • roles/serviceusage.serviceUsageConsumer

      如要進一步瞭解所需角色和權限,請參閱管理 IAM 驗證

安裝 AlloyDB 語言連接器

Java

AlloyDB Java 連接器是程式庫,可提供 IAM 型授權和加密機制,連線至 AlloyDB 執行個體。如果您使用 Spring Boot,請參閱 Spring Boot AlloyDB 啟動器

安裝

如果是 Maven,您可以將下列項目新增至專案的 pom.xml,安裝 AlloyDB Java 連接器:

<!-- Add the connector with the latest version -->
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>alloydb-jdbc-connector</artifactId>
  <version>0.4.0</version>
</dependency>

<!-- Add the driver with the latest version -->
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>46.6.0<</version>
</dependency>

<!-- Add HikariCP with the latest version -->
<dependency>
  <groupId>com.zaxxer</groupId>
  <artifactId>HikariCP</artifactId>
  <version>5.1.0</version>
</dependency>

如果是 Gradle,您可以在專案的 gradle.build 中加入下列項目,安裝 AlloyDB Java 連接器:

// Add connector with the latest version
implementation group: 'com.google.cloud.alloydb', name: 'alloydb-jdbc-connector', version: '0.4.0'

// Add driver with the latest version
implementation group: 'org.postgresql', name: 'postgresql', version: '46.6.0'

// Add HikariCP with the latest version
implementation group: 'com.zaxxer', name: 'HikariCP', version: '5.1.0'

Python (pg8000)

AlloyDB Python 連接器是一個程式庫,可與資料庫驅動程式搭配使用,讓具備足夠權限的使用者連線至 AlloyDB 資料庫,不必手動將 IP 加入允許清單。

安裝

您可以使用 pip install 安裝 AlloyDB Python 連接器程式庫。

如果您使用 pg8000,請執行下列指令:

pip install "google-cloud-alloydb-connector[pg8000]" sqlalchemy

Python (asyncpg)

AlloyDB Python 連接器是一個程式庫,可與資料庫驅動程式搭配使用,讓具備足夠權限的使用者連線至 AlloyDB 資料庫,不必手動將 IP 加入允許清單。

安裝

您可以使用 pip install 安裝 AlloyDB Python 連接器程式庫。

如果您使用 asyncpg,請執行下列指令:

如果是 asyncpg,請使用:

pip install "google-cloud-alloydb-connector[asyncpg]" "sqlalchemy[asyncio]"

如要進一步瞭解非同步驅動程式的使用方式,請參閱「非同步驅動程式的使用方式」。

Go (pgx)

AlloyDB Go 連接器是專為 Go 語言設計的 AlloyDB 連接器。

安裝

您可以使用 go get 安裝 AlloyDB Go 連接器。

如果您使用 pgx,請執行下列指令:

go get github.com/jackc/pgx/v5
go get cloud.google.com/go/alloydbconn

Go (database/sql)

AlloyDB Go 連接器是專為 Go 語言設計的 AlloyDB 連接器。

安裝

您可以使用 go get 安裝 AlloyDB Go 連接器。

如果您使用 database/sql,請執行下列指令:

go get cloud.google.com/go/alloydbconn

設定 AlloyDB 語言連接器

Java

如要使用 AlloyDB Java 連接器連線至 AlloyDB 叢集,請按照下列步驟進行設定。

如要在網頁應用程式中使用這個程式碼片段,請參閱 GitHub 上的 README


import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class AlloyDbJdbcConnectorDataSourceFactory {

  public static final String ALLOYDB_DB = System.getenv("ALLOYDB_DB");
  public static final String ALLOYDB_USER = System.getenv("ALLOYDB_USER");
  public static final String ALLOYDB_PASS = System.getenv("ALLOYDB_PASS");
  public static final String ALLOYDB_INSTANCE_NAME = System.getenv("ALLOYDB_INSTANCE_NAME");

  static HikariDataSource createDataSource() {
    HikariConfig config = new HikariConfig();

    config.setJdbcUrl(String.format("jdbc:postgresql:///%s", ALLOYDB_DB));
    config.setUsername(ALLOYDB_USER); // e.g., "postgres"
    config.setPassword(ALLOYDB_PASS); // e.g., "secret-password"
    config.addDataSourceProperty("socketFactory", "com.google.cloud.alloydb.SocketFactory");
    // e.g., "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
    config.addDataSourceProperty("alloydbInstanceName", ALLOYDB_INSTANCE_NAME);

    return new HikariDataSource(config);
  }
}

使用公開 IP

如果您使用公開 IP 連線至 AlloyDB 叢集,請加入下列項目:

config.addDataSourceProperty("alloydbIpType", "PUBLIC");

使用 Private Service Connect

如果您使用 Private Service Connect 連線至 AlloyDB 執行個體,請加入下列項目:

config.addDataSourceProperty("alloydbIpType", "PSC");

自動 IAM 驗證

根據預設,AlloyDB 語言連接器會使用內建驗證機制。您可以使用 AlloyDB Java 連接器的自動 IAM 驗證功能。如要啟用,請加入下列項目:

config.addDataSourceProperty("alloydbEnableIAMAuth", "true");

Python (pg8000)

如要使用 AlloyDB Python 連接器連線至 AlloyDB 叢集,請按照下列步驟設定連接器 (如果您使用 pg8000)。

如要在網頁應用程式中使用這個程式碼片段,請參閱 GitHub 上的 README

import sqlalchemy

from google.cloud.alloydbconnector import Connector


def create_sqlalchemy_engine(
    inst_uri: str,
    user: str,
    password: str,
    db: str,
    refresh_strategy: str = "background",
) -> tuple[sqlalchemy.engine.Engine, Connector]:
    """Creates a connection pool for an AlloyDB instance and returns the pool
    and the connector. Callers are responsible for closing the pool and the
    connector.

    A sample invocation looks like:

        engine, connector = create_sqlalchemy_engine(
            inst_uri,
            user,
            password,
            db,
        )
        with engine.connect() as conn:
            time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
            conn.commit()
            curr_time = time[0]
            # do something with query result
            connector.close()

    Args:
        instance_uri (str):
            The instance URI specifies the instance relative to the project,
            region, and cluster. For example:
            "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
        user (str):
            The database user name, e.g., postgres
        password (str):
            The database user's password, e.g., secret-password
        db (str):
            The name of the database, e.g., mydb
        refresh_strategy (Optional[str]):
            Refresh strategy for the AlloyDB Connector. Can be one of "lazy"
            or "background". For serverless environments use "lazy" to avoid
            errors resulting from CPU being throttled.
    """
    connector = Connector(refresh_strategy=refresh_strategy)

    # create SQLAlchemy connection pool
    engine = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=lambda: connector.connect(
            inst_uri,
            "pg8000",
            user=user,
            password=password,
            db=db,
        ),
    )
    engine.dialect.description_encoding = None
    return engine, connector

使用公開 IP

如果您使用公開 IP 連線至 AlloyDB 叢集,請將連線函式替換為下列項目:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify public IP
          ip_type=IPTypes.PUBLIC,
      )
      return conn

使用 Private Service Connect

如果您使用 Private Service Connect 連線至 AlloyDB 執行個體,請加入下列項目:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify PSC
          ip_type=IPTypes.PSC,
        )
      return conn

自動 IAM 驗證

根據預設,AlloyDB 語言連接器會使用內建驗證機制。您可以使用 AlloyDB Python 連接器進行自動 IAM 驗證。如要啟用,請將連線函式替換為下列內容:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use enable_iam_auth to enable IAM authentication
          enable_iam_auth=True,
      )
      return conn

Python (asyncpg)

如要使用 AlloyDB Python 連接器連線至 AlloyDB 叢集,請按照下列步驟設定連接器 (如果您使用 async)。

如要在網頁應用程式中使用這個程式碼片段,請參閱 GitHub 上的 README

import asyncpg
import sqlalchemy
import sqlalchemy.ext.asyncio

from google.cloud.alloydbconnector import AsyncConnector


async def create_sqlalchemy_engine(
    inst_uri: str,
    user: str,
    password: str,
    db: str,
    refresh_strategy: str = "background",
) -> tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]:
    """Creates a connection pool for an AlloyDB instance and returns the pool
    and the connector. Callers are responsible for closing the pool and the
    connector.

    A sample invocation looks like:

        engine, connector = await create_sqlalchemy_engine(
            inst_uri,
            user,
            password,
            db,
        )
        async with engine.connect() as conn:
            time = await conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
            curr_time = time[0]
            # do something with query result
            await connector.close()

    Args:
        instance_uri (str):
            The instance URI specifies the instance relative to the project,
            region, and cluster. For example:
            "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
        user (str):
            The database user name, e.g., postgres
        password (str):
            The database user's password, e.g., secret-password
        db (str):
            The name of the database, e.g., mydb
        refresh_strategy (Optional[str]):
            Refresh strategy for the AlloyDB Connector. Can be one of "lazy"
            or "background". For serverless environments use "lazy" to avoid
            errors resulting from CPU being throttled.
    """
    connector = AsyncConnector(refresh_strategy=refresh_strategy)

    # create SQLAlchemy connection pool
    engine = sqlalchemy.ext.asyncio.create_async_engine(
        "postgresql+asyncpg://",
        async_creator=lambda: connector.connect(
            inst_uri,
            "asyncpg",
            user=user,
            password=password,
            db=db,
        ),
        execution_options={"isolation_level": "AUTOCOMMIT"},
    )
    return engine, connector

使用公開 IP

如果您使用公開 IP 連線至 AlloyDB 叢集,請將連線函式替換為下列函式:

  async def getconn() -> asyncpg.Connection:
    conn: asyncpg.Connection = await connector.connect(
        inst_uri,
        "asyncpg",
        user=user,
        password=password,
        db=db,
        # use ip_type to specify public IP
        ip_type=IPTypes.PUBLIC,
    )
    return conn

使用 Private Service Connect

如果您使用 Private Service Connect 連線至 AlloyDB 執行個體,請加入下列項目:

    async def getconn() -> asyncpg.Connection:
      conn: asyncpg.Connection = await connector.connect(
          inst_uri,
          "asyncpg",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify PSC
          ip_type=IPTypes.PSC,
      )
      return conn

自動 IAM 驗證

根據預設,AlloyDB 語言連接器會使用內建驗證機制。您可以使用 AlloyDB Python 連接器,自動進行 IAM 驗證。如要啟用,請將連線函式替換為下列內容:

  async def getconn() -> asyncpg.Connection:
    conn: asyncpg.Connection = await connector.connect(
        inst_uri,
        "asyncpg",
        user=user,
        password=password,
        db=db,
        # use enable_iam_auth to enable IAM authentication
        enable_iam_auth=True,
    )
    return conn

Go (pgx)

如要使用 AlloyDB Go 連接器連線至 AlloyDB 叢集,請按照下列步驟設定連接器 (如果您使用 pgx)。

如要在網頁應用程式中使用這個程式碼片段,請參閱 GitHub 上的 README

import (
	"context"
	"fmt"
	"net"

	"cloud.google.com/go/alloydbconn"
	"github.com/jackc/pgx/v5/pgxpool"
)

// connectPgx establishes a connection to your database using pgxpool and the
// AlloyDB Go Connector (aka alloydbconn.Dialer)
//
// The function takes an instance URI, a username, a password, and a database
// name. Usage looks like this:
//
//	pool, cleanup, err := connectPgx(
//	  context.Background(),
//	  "projects/myproject/locations/us-central1/clusters/mycluster/instances/myinstance",
//	  "postgres",
//	  "secretpassword",
//	  "mydb",
//	)
//
// In addition to a *pgxpool.Pool type, the function returns a cleanup function
// that should be called when you're done with the database connection.
func connectPgx(
	ctx context.Context, instURI, user, pass, dbname string,
	opts ...alloydbconn.Option,
) (*pgxpool.Pool, func() error, error) {
	// First initialize the dialer. alloydbconn.NewDialer accepts additional
	// options to configure credentials, timeouts, etc.
	//
	// For details, see:
	// https://pkg.go.dev/cloud.google.com/go/alloydbconn#Option
	d, err := alloydbconn.NewDialer(ctx, opts...)
	if err != nil {
		noop := func() error { return nil }
		return nil, noop, fmt.Errorf("failed to init Dialer: %v", err)
	}
	// The cleanup function will stop the dialer's background refresh
	// goroutines. Call it when you're done with your database connection to
	// avoid a goroutine leak.
	cleanup := func() error { return d.Close() }

	dsn := fmt.Sprintf(
		// sslmode is disabled, because the Dialer will handle the SSL
		// connection instead.
		"user=%s password=%s dbname=%s sslmode=disable",
		user, pass, dbname,
	)

	// Prefer pgxpool for applications.
	// For more information, see:
	// https://github.com/jackc/pgx/wiki/Getting-started-with-pgx#using-a-connection-pool
	config, err := pgxpool.ParseConfig(dsn)
	if err != nil {
		return nil, cleanup, fmt.Errorf("failed to parse pgx config: %v", err)
	}

	// Tell pgx to use alloydbconn.Dialer to connect to the instance.
	config.ConnConfig.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) {
		return d.Dial(ctx, instURI)
	}

	// Establish the connection.
	pool, connErr := pgxpool.NewWithConfig(ctx, config)
	if connErr != nil {
		return nil, cleanup, fmt.Errorf("failed to connect: %s", connErr)
	}

	return pool, cleanup, nil
}

使用公開 IP

如果您使用公開 IP 連線至 AlloyDB 叢集,請將 d.Dial 函式替換為下列項目:

d.Dial(ctx, instURI, alloydbconn.WithPublicIP())

使用 Private Service Connect

如果您使用 Private Service Connect 連線至 AlloyDB 執行個體,請加入下列項目:

  d.Dial(ctx, instURI, alloydbconn.WithPSC())

自動 IAM 驗證

根據預設,AlloyDB 語言連接器會使用內建驗證機制。您可以使用 AlloyDB Go 連接器進行自動 IAM 驗證。如要啟用,請將 alloydbconn.NewDialer 函式替換為以下內容:

d, err := alloydbconn.NewDialer(ctx, alloydbconn.WithIAMAuthN())

停用內建遙測功能

WithOptOutOfBuiltInTelemetry 選項會停用內部指標匯出功能。 根據預設,撥號器會向alloydb.googleapis.com系統指標前置字元回報內部作業。這些指標可協助 AlloyDB 提升效能,並找出用戶端連線問題。如果應用程式在禁止匯出輸出指標的環境中運作,這個選項就非常實用。如要停用這項遙測功能,請在初始化撥號器時提供下列選項:

  d.Dial(ctx, instURI, alloydbconn.WithOptOutOfBuiltInTelemetry())

Go (database/sql)

如要使用 AlloyDB Go 連接器連線至 AlloyDB 叢集,請按照下列步驟設定連接器 (如果您使用 database/sql)。

如要在網頁應用程式中使用這個程式碼片段,請參閱 GitHub 上的 README

import (
	"database/sql"
	"fmt"

	"cloud.google.com/go/alloydbconn"
	"cloud.google.com/go/alloydbconn/driver/pgxv5"
)

// connectDatabaseSQL establishes a connection to your database using the Go
// standard library's database/sql package and using the AlloyDB Go Connector
// (aka alloydbconn.Dialer)
//
// The function takes an instance URI, a username, a password, and a database
// name. Usage looks like this:
//
//	db, cleanup, err := connectDatabaseSQL(
//	  "projects/myproject/locations/us-central1/clusters/mycluster/instances/myinstance",
//	  "postgres",
//	  "secretpassword",
//	  "mydb",
//	)
//
// In addition to a *db.SQL type, the function returns a cleanup function that
// should be called when you're done with the database connection.
func connectDatabaseSQL(
	instURI, user, pass, dbname string,
	opts ...alloydbconn.Option,
) (*sql.DB, func() error, error) {
	// First, register the AlloyDB driver. Note, the driver's name is arbitrary
	// and must only match what you use below in sql.Open. Also,
	// pgxv5.RegisterDriver accepts options to configure credentials, timeouts,
	// etc.
	//
	// For details, see:
	// https://pkg.go.dev/cloud.google.com/go/alloydbconn#Option
	//
	// The cleanup function will stop the dialer's background refresh
	// goroutines. Call it when you're done with your database connection to
	// avoid a goroutine leak.
	cleanup, err := pgxv5.RegisterDriver("alloydb", opts...)
	if err != nil {
		return nil, cleanup, err
	}

	db, err := sql.Open(
		"alloydb",
		fmt.Sprintf(
			// sslmode is disabled, because the Dialer will handle the SSL
			// connection instead.
			"host=%s user=%s password=%s dbname=%s sslmode=disable",
			instURI, user, pass, dbname,
		),
	)
	return db, cleanup, err
}

使用公開 IP

如果您使用公開 IP 連線至 AlloyDB 叢集,請將 RegisterDriver 函式替換為下列項目:

cleanup, err := pgxv5.RegisterDriver(
  "alloydb",
  alloydbconn.WithDefaultDialOptions(alloydbconn.WithPublicIP())
)

使用 Private Service Connect

如果您使用 Private Service Connect 連線至 AlloyDB 執行個體,請加入下列項目:

  cleanup, err := pgxv5.RegisterDriver(
    "alloydb",
    alloydbconn.WithDefaultDialOptions(alloydbconn.WithPSC())
)

自動 IAM 驗證

根據預設,AlloyDB 語言連接器會使用內建驗證機制。您可以使用 AlloyDB Go 連接器進行自動 IAM 驗證。如要啟用,請將 RegisterDriver 函式替換為下列內容:

cleanup, err := pgxv5.RegisterDriver(
  "alloydb",
  alloydbconn.WithIAMAuthN()
)

停用內建遙測功能

WithOptOutOfBuiltInTelemetry 選項會停用內部指標匯出功能。 根據預設,撥號器會向alloydb.googleapis.com系統指標前置字元回報內部作業。這些指標可協助 AlloyDB 提升效能,並找出用戶端連線問題。如果應用程式在禁止匯出輸出指標的環境中運作,這個選項就非常實用。如要停用這項遙測功能,請在初始化撥號器時提供下列選項:

cleanup, err := pgxv5.RegisterDriver(
  "alloydb",
  alloydbconn.WithOptOutOfBuiltInTelemetry(),
)