使用 AlloyDB 语言连接器进行连接

AlloyDB 语言连接器是一些库,可简化与集群建立安全连接的过程,具体方法如下:

  • 自动 mTLS 连接
  • 基于 Identity and Access Management (IAM) 的授权支持
  • 自动 IAM 身份验证

如果尚无网络路径,AlloyDB 语言连接器无法提供 AlloyDB 集群的网络路径。

如需详细了解 AlloyDB 语言连接器,请参阅 AlloyDB 语言连接器概览

本页介绍了以下 AlloyDB 语言连接器:

  • AlloyDB Java 连接器
  • AlloyDB Go Connector
  • AlloyDB Python 连接器

准备工作

  1. 启用 AlloyDB Admin API。

    启用该 API

  2. 创建 AlloyDB 实例并配置默认用户。

    如需详细了解如何创建实例,请参阅创建主实例

    如需详细了解用户角色,请参阅 AlloyDB 预定义 IAM 角色

  3. 配置连接到 AlloyDB 实例所需的以下角色和权限:

    • roles/alloydb.client
    • roles/serviceusage.serviceUsageConsumer

      如需详细了解所需的角色和权限,请参阅管理 IAM 身份验证

安装 AlloyDB 语言连接器

Java

AlloyDB Java 连接器是在连接到 AlloyDB 实例时提供基于 IAM 的授权和加密的库。

安装

对于 Maven,您可以通过向项目的 pom.xml 添加以下内容来安装 AlloyDB Java 连接器:

<!-- Add the connector with the latest version -->
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>alloydb-jdbc-connector</artifactId>
  <version>0.4.0</version>
</dependency>

<!-- Add the driver with the latest version -->
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>46.6.0<</version>
</dependency>

<!-- Add HikariCP with the latest version -->
<dependency>
  <groupId>com.zaxxer</groupId>
  <artifactId>HikariCP</artifactId>
  <version>5.1.0</version>
</dependency>

对于 Gradle,您可以在项目的 gradle.build 中添加以下内容,以安装 AlloyDB Java 连接器:

// Add connector with the latest version
implementation group: 'com.google.cloud.alloydb', name: 'alloydb-jdbc-connector', version: '0.4.0'

// Add driver with the latest version
implementation group: 'org.postgresql', name: 'postgresql', version: '46.6.0'

// Add HikariCP with the latest version
implementation group: 'com.zaxxer', name: 'HikariCP', version: '5.1.0'

Python (pg8000)

AlloyDB Python 连接器是一个库,可与数据库驱动程序结合使用,以允许具有足够权限的用户连接到 AlloyDB 数据库,而无需手动将 IP 列入许可名单。

安装

您可以使用 pip install 安装 AlloyDB Python 连接器库。

如果您使用的是 pg8000,请运行以下命令:

pip install "google-cloud-alloydb-connector[pg8000]" sqlalchemy

Python (asyncpg)

AlloyDB Python 连接器是一个库,可与数据库驱动程序结合使用,以允许具有足够权限的用户连接到 AlloyDB 数据库,而无需手动将 IP 列入许可名单。

安装

您可以使用 pip install 安装 AlloyDB Python 连接器库。

如果您使用的是 asyncpg,请运行以下命令:

对于 asyncpg,请使用:

pip install "google-cloud-alloydb-connector[asyncpg]" "sqlalchemy[asyncio]"

如需详细了解异步驱动程序用法,请参阅异步驱动程序用法

Go (pgx)

AlloyDB Go 连接器是专为与 Go 语言搭配使用而设计的 AlloyDB 连接器。

安装

您可以使用 go get 安装 AlloyDB Go Connector。

如果您使用的是 pgx,请运行以下命令:

go get github.com/jackc/pgx/v5
go get cloud.google.com/go/alloydbconn

Go(数据库/SQL)

AlloyDB Go 连接器是专为与 Go 语言搭配使用而设计的 AlloyDB 连接器。

安装

您可以使用 go get 安装 AlloyDB Go Connector。

如果您使用的是 database/sql,请运行以下命令:

go get cloud.google.com/go/alloydbconn

配置 AlloyDB 语言连接器

Java

如需使用 AlloyDB Java 连接器连接到 AlloyDB 集群,请按照以下步骤进行配置。

如需在 Web 应用环境中使用此代码段,请查看 GitHub 上的 README


import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class AlloyDbJdbcConnectorDataSourceFactory {

  public static final String ALLOYDB_DB = System.getenv("ALLOYDB_DB");
  public static final String ALLOYDB_USER = System.getenv("ALLOYDB_USER");
  public static final String ALLOYDB_PASS = System.getenv("ALLOYDB_PASS");
  public static final String ALLOYDB_INSTANCE_NAME = System.getenv("ALLOYDB_INSTANCE_NAME");

  static HikariDataSource createDataSource() {
    HikariConfig config = new HikariConfig();

    config.setJdbcUrl(String.format("jdbc:postgresql:///%s", ALLOYDB_DB));
    config.setUsername(ALLOYDB_USER); // e.g., "postgres"
    config.setPassword(ALLOYDB_PASS); // e.g., "secret-password"
    config.addDataSourceProperty("socketFactory", "com.google.cloud.alloydb.SocketFactory");
    // e.g., "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
    config.addDataSourceProperty("alloydbInstanceName", ALLOYDB_INSTANCE_NAME);

    return new HikariDataSource(config);
  }
}

使用公共 IP

如果您使用公共 IP 连接到 AlloyDB 集群,请添加以下内容:

config.addDataSourceProperty("alloydbIpType", "PUBLIC");

使用 Private Service Connect

如果您使用 Private Service Connect 连接到 AlloyDB 实例,请添加以下内容:

config.addDataSourceProperty("alloydbIpType", "PSC");

自动 IAM 身份验证

默认情况下,AlloyDB 语言连接器使用内置身份验证。您可以将自动 IAM 身份验证与 AlloyDB Java 连接器搭配使用。如需启用,请添加以下内容:

config.addDataSourceProperty("alloydbEnableIAMAuth", "true");

Python (pg8000)

如需使用 AlloyDB Python 连接器连接到 AlloyDB 集群,请按照以下步骤配置连接器(如果您使用的是 pg8000)。

如需在 Web 应用环境中使用此代码段,请查看 GitHub 上的 README

import pg8000
import sqlalchemy

from google.cloud.alloydb.connector import Connector


def create_sqlalchemy_engine(
    inst_uri: str,
    user: str,
    password: str,
    db: str,
    refresh_strategy: str = "background",
) -> tuple[sqlalchemy.engine.Engine, Connector]:
    """Creates a connection pool for an AlloyDB instance and returns the pool
    and the connector. Callers are responsible for closing the pool and the
    connector.

    A sample invocation looks like:

        engine, connector = create_sqlalchemy_engine(
            inst_uri,
            user,
            password,
            db,
        )
        with engine.connect() as conn:
            time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
            conn.commit()
            curr_time = time[0]
            # do something with query result
            connector.close()

    Args:
        instance_uri (str):
            The instance URI specifies the instance relative to the project,
            region, and cluster. For example:
            "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
        user (str):
            The database user name, e.g., postgres
        password (str):
            The database user's password, e.g., secret-password
        db (str):
            The name of the database, e.g., mydb
        refresh_strategy (Optional[str]):
            Refresh strategy for the AlloyDB Connector. Can be one of "lazy"
            or "background". For serverless environments use "lazy" to avoid
            errors resulting from CPU being throttled.
    """
    connector = Connector(refresh_strategy=refresh_strategy)

    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            inst_uri,
            "pg8000",
            user=user,
            password=password,
            db=db,
        )
        return conn

    # create SQLAlchemy connection pool
    engine = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    engine.dialect.description_encoding = None
    return engine, connector

使用公共 IP

如果您使用公共 IP 连接到 AlloyDB 集群,请将连接函数替换为以下内容:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify public IP
          ip_type=IPTypes.PUBLIC,
      )
      return conn

使用 Private Service Connect

如果您使用 Private Service Connect 连接到 AlloyDB 实例,请添加以下内容:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify PSC
          ip_type=IPTypes.PSC,
        )
      return conn

自动 IAM 身份验证

默认情况下,AlloyDB 语言连接器使用内置身份验证。您可以将 自动 IAM 身份验证与 AlloyDB Python 连接器搭配使用。如需启用,请将连接函数替换为以下代码:

  def getconn() -> pg8000.dbapi.Connection:
      conn: pg8000.dbapi.Connection = connector.connect(
          inst_uri,
          "pg8000",
          user=user,
          password=password,
          db=db,
          # use enable_iam_auth to enable IAM authentication
          enable_iam_auth=True,
      )
      return conn

Python (asyncpg)

如需使用 AlloyDB Python 连接器连接到 AlloyDB 集群,请按照以下步骤配置连接器(如果您使用的是 async)。

如需在 Web 应用环境中使用此代码段,请查看 GitHub 上的 README

import asyncpg
import sqlalchemy
import sqlalchemy.ext.asyncio

from google.cloud.alloydb.connector import AsyncConnector


async def create_sqlalchemy_engine(
    inst_uri: str,
    user: str,
    password: str,
    db: str,
    refresh_strategy: str = "background",
) -> tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]:
    """Creates a connection pool for an AlloyDB instance and returns the pool
    and the connector. Callers are responsible for closing the pool and the
    connector.

    A sample invocation looks like:

        engine, connector = await create_sqlalchemy_engine(
            inst_uri,
            user,
            password,
            db,
        )
        async with engine.connect() as conn:
            time = await conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
            curr_time = time[0]
            # do something with query result
            await connector.close()

    Args:
        instance_uri (str):
            The instance URI specifies the instance relative to the project,
            region, and cluster. For example:
            "projects/my-project/locations/us-central1/clusters/my-cluster/instances/my-instance"
        user (str):
            The database user name, e.g., postgres
        password (str):
            The database user's password, e.g., secret-password
        db (str):
            The name of the database, e.g., mydb
        refresh_strategy (Optional[str]):
            Refresh strategy for the AlloyDB Connector. Can be one of "lazy"
            or "background". For serverless environments use "lazy" to avoid
            errors resulting from CPU being throttled.
    """
    connector = AsyncConnector(refresh_strategy=refresh_strategy)

    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect(
            inst_uri,
            "asyncpg",
            user=user,
            password=password,
            db=db,
        )
        return conn

    # create SQLAlchemy connection pool
    engine = sqlalchemy.ext.asyncio.create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
        execution_options={"isolation_level": "AUTOCOMMIT"},
    )
    return engine, connector

使用公共 IP

如果您使用公共 IP 连接到 AlloyDB 集群,请将连接函数替换为以下内容:

  async def getconn() -> asyncpg.Connection:
    conn: asyncpg.Connection = await connector.connect(
        inst_uri,
        "asyncpg",
        user=user,
        password=password,
        db=db,
        # use ip_type to specify public IP
        ip_type=IPTypes.PUBLIC,
    )
    return conn

使用 Private Service Connect

如果您使用 Private Service Connect 连接到 AlloyDB 实例,请添加以下内容:

    async def getconn() -> asyncpg.Connection:
      conn: asyncpg.Connection = await connector.connect(
          inst_uri,
          "asyncpg",
          user=user,
          password=password,
          db=db,
          # use ip_type to specify PSC
          ip_type=IPTypes.PSC,
      )
      return conn

自动 IAM 身份验证

默认情况下,AlloyDB 语言连接器使用内置身份验证。您可以将 自动 IAM 身份验证与 AlloyDB Python 连接器搭配使用。如需启用,请将连接函数替换为以下代码:

  async def getconn() -> asyncpg.Connection:
    conn: asyncpg.Connection = await connector.connect(
        inst_uri,
        "asyncpg",
        user=user,
        password=password,
        db=db,
        # use enable_iam_auth to enable IAM authentication
        enable_iam_auth=True,
    )
    return conn

Go (pgx)

如需使用 AlloyDB Go Connector 连接到 AlloyDB 集群,请按照以下步骤配置连接器(如果您使用的是 pgx)。

如需在 Web 应用环境中使用此代码段,请查看 GitHub 上的 README

import (
	"context"
	"fmt"
	"net"

	"cloud.google.com/go/alloydbconn"
	"github.com/jackc/pgx/v5/pgxpool"
)

// connectPgx establishes a connection to your database using pgxpool and the
// AlloyDB Go Connector (aka alloydbconn.Dialer)
//
// The function takes an instance URI, a username, a password, and a database
// name. Usage looks like this:
//
//	pool, cleanup, err := connectPgx(
//	  context.Background(),
//	  "projects/myproject/locations/us-central1/clusters/mycluster/instances/myinstance",
//	  "postgres",
//	  "secretpassword",
//	  "mydb",
//	)
//
// In addition to a *pgxpool.Pool type, the function returns a cleanup function
// that should be called when you're done with the database connection.
func connectPgx(
	ctx context.Context, instURI, user, pass, dbname string,
	opts ...alloydbconn.Option,
) (*pgxpool.Pool, func() error, error) {
	// First initialize the dialer. alloydbconn.NewDialer accepts additional
	// options to configure credentials, timeouts, etc.
	//
	// For details, see:
	// https://pkg.go.dev/cloud.google.com/go/alloydbconn#Option
	d, err := alloydbconn.NewDialer(ctx, opts...)
	if err != nil {
		noop := func() error { return nil }
		return nil, noop, fmt.Errorf("failed to init Dialer: %v", err)
	}
	// The cleanup function will stop the dialer's background refresh
	// goroutines. Call it when you're done with your database connection to
	// avoid a goroutine leak.
	cleanup := func() error { return d.Close() }

	dsn := fmt.Sprintf(
		// sslmode is disabled, because the Dialer will handle the SSL
		// connection instead.
		"user=%s password=%s dbname=%s sslmode=disable",
		user, pass, dbname,
	)

	// Prefer pgxpool for applications.
	// For more information, see:
	// https://github.com/jackc/pgx/wiki/Getting-started-with-pgx#using-a-connection-pool
	config, err := pgxpool.ParseConfig(dsn)
	if err != nil {
		return nil, cleanup, fmt.Errorf("failed to parse pgx config: %v", err)
	}

	// Tell pgx to use alloydbconn.Dialer to connect to the instance.
	config.ConnConfig.DialFunc = func(ctx context.Context, _ string, _ string) (net.Conn, error) {
		return d.Dial(ctx, instURI)
	}

	// Establish the connection.
	pool, connErr := pgxpool.NewWithConfig(ctx, config)
	if connErr != nil {
		return nil, cleanup, fmt.Errorf("failed to connect: %s", connErr)
	}

	return pool, cleanup, nil
}

使用公共 IP

如果您使用公共 IP 连接到 AlloyDB 集群,请将 d.Dial 函数替换为以下内容:

d.Dial(ctx, instURI, alloydbconn.WithPublicIP())

使用 Private Service Connect

如果您使用 Private Service Connect 连接到 AlloyDB 实例,请添加以下内容:

  d.Dial(ctx, instURI, alloydbconn.WithPSC())

自动 IAM 身份验证

默认情况下,AlloyDB 语言连接器使用内置身份验证。您可以将自动 IAM 身份验证与 AlloyDB Go 连接器搭配使用。如需启用,请将 alloydbconn.NewDialer 函数替换为以下代码:

d, err := alloydbconn.NewDialer(ctx, alloydbconn.WithIAMAuthN())

Go(数据库/SQL)

如需使用 AlloyDB Go 连接器连接到 AlloyDB 集群,请按照以下步骤配置连接器(如果您使用的是 database/sql)。

如需在 Web 应用环境中使用此代码段,请查看 GitHub 上的 README

import (
	"database/sql"
	"fmt"

	"cloud.google.com/go/alloydbconn/driver/pgxv5"
)

// connectDatabaseSQL establishes a connection to your database using the Go
// standard library's database/sql package and using the AlloyDB Go Connector
// (aka alloydbconn.Dialer)
//
// The function takes an instance URI, a username, a password, and a database
// name. Usage looks like this:
//
//	db, cleanup, err := connectDatabaseSQL(
//	  "projects/myproject/locations/us-central1/clusters/mycluster/instances/myinstance",
//	  "postgres",
//	  "secretpassword",
//	  "mydb",
//	)
//
// In addition to a *db.SQL type, the function returns a cleanup function that
// should be called when you're done with the database connection.
func connectDatabaseSQL(
	instURI, user, pass, dbname string,
) (*sql.DB, func() error, error) {
	// First, register the AlloyDB driver. Note, the driver's name is arbitrary
	// and must only match what you use below in sql.Open. Also,
	// pgxv5.RegisterDriver accepts options to configure credentials, timeouts,
	// etc.
	//
	// For details, see:
	// https://pkg.go.dev/cloud.google.com/go/alloydbconn#Option
	//
	// The cleanup function will stop the dialer's background refresh
	// goroutines. Call it when you're done with your database connection to
	// avoid a goroutine leak.
	cleanup, err := pgxv5.RegisterDriver("alloydb")
	if err != nil {
		return nil, cleanup, err
	}

	db, err := sql.Open(
		"alloydb",
		fmt.Sprintf(
			// sslmode is disabled, because the Dialer will handle the SSL
			// connection instead.
			"host=%s user=%s password=%s dbname=%s sslmode=disable",
			instURI, user, pass, dbname,
		),
	)
	return db, cleanup, err
}

使用公共 IP

如果您使用公共 IP 连接到 AlloyDB 集群,请将 RegisterDriver 函数替换为以下内容:

cleanup, err := pgxv5.RegisterDriver(
  "alloydb",
  alloydbconn.WithDefaultDialOptions(alloydbconn.WithPublicIP())
)

使用 Private Service Connect

如果您使用 Private Service Connect 连接到 AlloyDB 实例,请添加以下内容:

  cleanup, err := pgxv5.RegisterDriver(
    "alloydb",
    alloydbconn.WithDefaultDialOptions(alloydbconn.WithPSC())
)

自动 IAM 身份验证

默认情况下,AlloyDB 语言连接器使用内置身份验证。您可以将自动 IAM 身份验证与 AlloyDB Go 连接器搭配使用。如需启用,请将 RegisterDriver 函数替换为以下代码:

cleanup, err := pgxv4.RegisterDriver(
  "alloydb",
  alloydbconn.WithIAMAuthN()
)