使用 Cassandra 适配器连接到 Spanner

本页面介绍了 Cassandra 适配器,并说明了如何使用 Spanner,以及如何通过 Cassandra 适配器来连接 Spanner。

Cassandra 适配器设计为在应用所在的机器上运行。该适配器会在 localhost 上公开支持 Cassandra 查询语言 (CQL) 传输协议的端点。它会将 CQL 传输协议转换为 gRPC,即 Spanner 传输协议。在本地运行此代理后,Cassandra 客户端可以连接到 Spanner 数据库。

您可以通过以下方式启动 Cassandra 适配器:

  • 与 Go 应用一起启动
  • 与 Java 应用一起启动
  • 作为独立进程
  • 在Docker容器中

准备工作

在启动 Cassandra 适配器之前,请确保您已在 Cassandra 适配器将要运行的机器上通过用户账号或服务账号进行了身份验证。如果您使用的是服务账号,则必须知道 JSON 密钥文件(凭证文件)的位置。您可以设置 GOOGLE_APPLICATION_CREDENTIALS 环境变量来指定凭证路径。

有关详情,请参阅:

将 Cassandra 适配器连接到您的应用

Cassandra 适配器需要以下信息:

  • 项目名称
  • Spanner 实例名称
  • 要连接到的数据库

如果您使用的是 Docker,则需要 JSON 格式的凭证文件(密钥文件)的路径。

Java 进程中

  1. 如果您使用的是服务账号进行身份验证,请确保 GOOGLE_APPLICATION_CREDENTIALS 环境变量设置为凭证文件的路径。

  2. 对于 Java 应用,您可以将 google-cloud-spanner-cassandra 作为依赖项添加到项目中,从而将 Cassandra 适配器直接关联到应用。

对于 Maven,请在 <dependencies> 部分下添加以下新依赖项:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-spanner-cassandra</artifactId>
    <version>0.4.0</version>
</dependency>

对于 Gradle,请添加以下内容:

dependencies {
    implementation 'com.google.cloud:google-cloud-spanner-cassandra:0.4.0'
}

  1. 修改 CqlSession 创建代码。请使用 SpannerCqlSessionBuilder 并提供 Spanner 数据库 URI,而不是使用 CqlSessionBuilder
    
    import com.datastax.oss.driver.api.core.CqlSession;
    import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
    import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
    import com.datastax.oss.driver.api.core.cql.ResultSet;
    import com.datastax.oss.driver.api.core.cql.Row;
    import com.google.cloud.spanner.adapter.SpannerCqlSession;
    import java.net.InetSocketAddress;
    import java.time.Duration;
    import java.util.Random;
    
    // This sample assumes your spanner database <my_db> contains a table <users>
    // with the following schema:
    
    // CREATE TABLE users (
    //  id        INT64          OPTIONS (cassandra_type = 'int'),
    //  active    BOOL           OPTIONS (cassandra_type = 'boolean'),
    //  username  STRING(MAX)    OPTIONS (cassandra_type = 'text'),
    // ) PRIMARY KEY (id);
    
    class QuickStartSample {
    
      public static void main(String[] args) {
    
        // TODO(developer): Replace these variables before running the sample.
        final String projectId = "my-gcp-project";
        final String instanceId = "my-spanner-instance";
        final String databaseId = "my_db";
    
        final String databaseUri =
            String.format("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId);
    
        try (CqlSession session =
            SpannerCqlSession.builder() // `SpannerCqlSession` instead of `CqlSession`
                .setDatabaseUri(databaseUri) // Set spanner database URI.
                .addContactPoint(new InetSocketAddress("localhost", 9042))
                .withLocalDatacenter("datacenter1")
                .withKeyspace(databaseId) // Keyspace name should be the same as spanner database name
                .withConfigLoader(
                    DriverConfigLoader.programmaticBuilder()
                        .withString(DefaultDriverOption.PROTOCOL_VERSION, "V4")
                        .withDuration(
                            DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(5))
                        .build())
                .build()) {
    
          final int randomUserId = new Random().nextInt(Integer.MAX_VALUE);
    
          System.out.printf("Inserting user with ID: %d%n", randomUserId);
    
          // INSERT data
          session.execute(
              "INSERT INTO users (id, active, username) VALUES (?, ?, ?)",
              randomUserId,
              true,
              "John Doe");
    
          System.out.printf("Successfully inserted user: %d%n", randomUserId);
          System.out.printf("Querying user: %d%n", randomUserId);
    
          // SELECT data
          ResultSet rs =
              session.execute("SELECT id, active, username FROM users WHERE id = ?", randomUserId);
    
          // Get the first row from the result set
          Row row = rs.one();
    
          System.out.printf(
              "%d %b %s%n", row.getInt("id"), row.getBoolean("active"), row.getString("username"));
    
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    

Go 进程中

对于 Go 应用,您只需对集群初始化文件进行一行更改,即可集成 Spanner Cassandra Go 客户端。然后,您可以将 Cassandra 适配器直接关联到应用。

  1. 在 Go 应用中从 Spanner Cassandra Go 客户端导入适配器的 spanner 软件包。
import spanner "github.com/googleapis/go-spanner-cassandra/cassandra/gocql"
  1. 将集群创建代码修改为使用 spanner.NewCluster(而非 gocql.NewCluster),并提供 Spanner 数据库 URI:
    import (
    	"fmt"
    	"io"
    	"math"
    	"math/rand/v2"
    	"time"
    
    	spanner "github.com/googleapis/go-spanner-cassandra/cassandra/gocql"
    )
    
    // This sample assumes your spanner database <your_db> contains a table <users>
    // with the following schema:
    //
    // CREATE TABLE users (
    //	id   	 	INT64          OPTIONS (cassandra_type = 'int'),
    //	active    	BOOL           OPTIONS (cassandra_type = 'boolean'),
    //	username  	STRING(MAX)    OPTIONS (cassandra_type = 'text'),
    // ) PRIMARY KEY (id);
    
    func quickStart(databaseURI string, w io.Writer) error {
    	opts := &spanner.Options{
    		DatabaseUri: databaseURI,
    	}
    	cluster := spanner.NewCluster(opts)
    	if cluster == nil {
    		return fmt.Errorf("failed to create cluster")
    	}
    	defer spanner.CloseCluster(cluster)
    
    	// You can still configure your cluster as usual after connecting to your
    	// spanner database
    	cluster.Timeout = 5 * time.Second
    	cluster.Keyspace = "your_db_name"
    
    	session, err := cluster.CreateSession()
    
    	if err != nil {
    		return err
    	}
    
    	randomUserId := rand.IntN(math.MaxInt32)
    	if err = session.Query("INSERT INTO users (id, active, username) VALUES (?, ?, ?)",
    			       randomUserId, true, "John Doe").
    		Exec(); err != nil {
    		return err
    	}
    
    	var id int
    	var active bool
    	var username string
    	if err = session.Query("SELECT id, active, username FROM users WHERE id = ?",
    			       randomUserId).
    		Scan(&id, &active, &username); err != nil {
    		return err
    	}
    	fmt.Fprintf(w, "%d %v %s\n", id, active, username)
    	return nil
    }

连接到 Spanner 数据库后,您可以照常配置集群。

独立

  1. 克隆代码库:
git clone https://github.com/googleapis/go-spanner-cassandra.git
cd go-spanner-cassandra
  1. 使用必需的 -db 标志运行 cassandra_launcher.go
go run cassandra_launcher.go \
-db "projects/my_project/instances/my_instance/databases/my_database"
  1. -db 替换为您的 Spanner 数据库 URI。

Docker

使用以下命令启动 Cassandra 适配器。

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
docker run -d -p 9042:9042 \
-e GOOGLE_APPLICATION_CREDENTIALS \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS}:ro \
gcr.io/cloud-spanner-adapter/cassandra-adapter \
-db DATABASE_URI

以下列表包含 Spanner Cassandra 适配器最常用的启动选项:

  • -db <DatabaseUri>

Spanner 数据库 URI(必需)。此选项指定客户端连接到的 Spanner 数据库。例如 projects/YOUR_PROJECT/instances/YOUR_INSTANCE/databases/YOUR_DATABASE

  • -tcp <TCPEndpoint>

客户端代理监听器地址。此选项定义客户端监听传入 Cassandra 客户端连接的 TCP 端点。默认:localhost:9042

  • -grpc-channels <NumGrpcChannels>

连接到 Spanner 时要使用的 gRPC 通道数。默认值:4

例如,以下命令使用应用凭证在端口 9042 上启动 Cassandra 适配器,并将适配器连接到 projects/my_project/instances/my_instance/databases/my_database 数据库:

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
docker run -d -p 9042:9042 \
-e GOOGLE_APPLICATION_CREDENTIALS \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS}:ro \
gcr.io/cloud-spanner-adapter/cassandra-adapter \
-db projects/my_project/instances/my_instance/databases/my_database

建议

以下建议有助于您改善 Cassandra 适配器的使用体验。这些建议主要针对 Java,尤其是 Java 版本 4 Cassandra 客户端驱动程序

增加请求超时时间

与默认值 2 秒相比,请求超时时间为 5 秒或更长时间可提供更好的 Cassandra 适配器体验。

# Sample application.conf: increases request timeout to five seconds
datastax-java-driver {
  basic {
    request {
      timeout = 5 seconds
    }
  }
}

调整连接池

默认的连接数上限和每个连接或主机的并发请求数上限配置适用于开发环境、测试环境,以及低数据量生产环境或预演环境。不过,我们建议您增加这些值,因为 Cassandra 适配器会伪装成单个节点,而非 Cassandra 集群中的节点池。

增加这些值可在客户端和 Cassandra 接口之间实现更多并发连接。这样可以防止在高负载下耗尽连接池。

# Sample application.conf: increases maximum number of requests that can be
# executed concurrently on a connection
advanced.connection {
  max-requests-per-connection = 32000
  pool {
    local.size = 10
  }
}

调整 gRPC 通道

Spanner 客户端使用 gRPC 通道进行通信。一个 gRPC 通道大致相当于一条 TCP 连接。一个 gRPC 通道最多可处理 100 个并发请求。这意味着,应用需要的 gRPC 通道数至少应等于应用将执行的并发请求数除以 100 的值。

停用令牌感知路由

使用令牌感知型负载均衡的驱动程序在使用 Cassandra 适配器时可能会输出警告,也可能无法正常运行。由于 Cassandra 适配器伪装成单节点,因此 Cassandra 适配器并不总是能很好地与令牌感知型驱动程序搭配使用,因为令牌感知驱动程序要求集群中至少有复制因子数量的节点。有些驱动程序可能会输出警告(可忽略),并回退到类似轮循均衡政策的功能,而有些驱动程序可能会失败并显示错误。对于失败并显示错误的驱动程序,您必须停用令牌感知或配置轮循负载均衡政策。

# Sample application.conf: disables token-aware routing
metadata {
  token-map {
    enabled = false
  }
}

将协议版本固定为 V4

Cassandra 适配器与任何符合CQL Binary v4 传输协议的开源 Apache Cassandra 客户端驱动程序兼容。请务必将 PROTOCOL_VERSION 固定为 V4,否则您可能会看到连接错误。

# Sample application.conf: overrides protocol version to V4
datastax-java-driver {
  advanced.protocol.version = V4
}

后续步骤