Cassandra アダプタを使用して Spanner に接続する

このページでは、Cassandra Adapter について説明し、Cassandra Adapter から Spanner を使用および接続する方法を解説します。

Cassandra Adapter は、アプリケーションと同じマシンで実行するように設計されています。このアダプタにより、Cassandra クエリ言語(CQL)ワイヤ プロトコルをサポートするローカルホストのエンドポイントが公開されます。また、CQL ワイヤ プロトコルが gRPC(Spanner ワイヤ プロトコル)に変換されます。このプロキシをローカルで実行すると、Cassandra クライアントを Spanner データベースに接続できます。

Cassandra Adapter は次の場面で開始できます。

  • Go アプリケーションでの処理中
  • Java アプリケーションでの処理中
  • スタンドアロン プロセスとして
  • Docker コンテナで

始める前に

Cassandra Adapter を起動する前に、Cassandra Adapter を実行するマシンのユーザー アカウントまたはサービス アカウントで認証されていることを確認します。サービス アカウントを使用している場合は、JSON 鍵ファイル(認証情報ファイル)の場所を把握している必要があります。認証情報のパスを指定するように GOOGLE_APPLICATION_CREDENTIALS 環境変数を設定します。

詳しくは以下をご覧ください。

Cassandra Adapter をアプリケーションに接続する

Cassandra Adapter には、次の情報が必要です。

  • プロジェクト名
  • Spanner のインスタンス名
  • 接続先のデータベース

Docker を使用する場合は、JSON 形式の認証情報ファイル(鍵ファイル)のパスが必要です。

Java プロセス内

  1. 認証にサービス アカウントを使用している場合は、GOOGLE_APPLICATION_CREDENTIALS 環境変数が認証情報ファイルのパスに設定されていることを確認します。

  2. Java アプリケーションの場合は、google-cloud-spanner-cassandra をプロジェクトに依存関係として追加することで、Cassandra Adapter をアプリケーションに直接リンクできます。

Maven の場合は、<dependencies> セクションに次の新しい依存関係を追加します。

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-spanner-cassandra</artifactId>
    <version>0.4.0</version>
</dependency>

Gradle の場合は、以下を追加します。

dependencies {
    implementation 'com.google.cloud:google-cloud-spanner-cassandra:0.4.0'
}

  1. CqlSession 作成コードを変更します。CqlSessionBuilder ではなく SpannerCqlSessionBuilder を使用して、Spanner データベース URI を指定します。
    
    import com.datastax.oss.driver.api.core.CqlSession;
    import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
    import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
    import com.datastax.oss.driver.api.core.cql.ResultSet;
    import com.datastax.oss.driver.api.core.cql.Row;
    import com.google.cloud.spanner.adapter.SpannerCqlSession;
    import java.net.InetSocketAddress;
    import java.time.Duration;
    import java.util.Random;
    
    // This sample assumes your spanner database <my_db> contains a table <users>
    // with the following schema:
    
    // CREATE TABLE users (
    //  id        INT64          OPTIONS (cassandra_type = 'int'),
    //  active    BOOL           OPTIONS (cassandra_type = 'boolean'),
    //  username  STRING(MAX)    OPTIONS (cassandra_type = 'text'),
    // ) PRIMARY KEY (id);
    
    class QuickStartSample {
    
      public static void main(String[] args) {
    
        // TODO(developer): Replace these variables before running the sample.
        final String projectId = "my-gcp-project";
        final String instanceId = "my-spanner-instance";
        final String databaseId = "my_db";
    
        final String databaseUri =
            String.format("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId);
    
        try (CqlSession session =
            SpannerCqlSession.builder() // `SpannerCqlSession` instead of `CqlSession`
                .setDatabaseUri(databaseUri) // Set spanner database URI.
                .addContactPoint(new InetSocketAddress("localhost", 9042))
                .withLocalDatacenter("datacenter1")
                .withKeyspace(databaseId) // Keyspace name should be the same as spanner database name
                .withConfigLoader(
                    DriverConfigLoader.programmaticBuilder()
                        .withString(DefaultDriverOption.PROTOCOL_VERSION, "V4")
                        .withDuration(
                            DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(5))
                        .build())
                .build()) {
    
          final int randomUserId = new Random().nextInt(Integer.MAX_VALUE);
    
          System.out.printf("Inserting user with ID: %d%n", randomUserId);
    
          // INSERT data
          session.execute(
              "INSERT INTO users (id, active, username) VALUES (?, ?, ?)",
              randomUserId,
              true,
              "John Doe");
    
          System.out.printf("Successfully inserted user: %d%n", randomUserId);
          System.out.printf("Querying user: %d%n", randomUserId);
    
          // SELECT data
          ResultSet rs =
              session.execute("SELECT id, active, username FROM users WHERE id = ?", randomUserId);
    
          // Get the first row from the result set
          Row row = rs.one();
    
          System.out.printf(
              "%d %b %s%n", row.getInt("id"), row.getBoolean("active"), row.getString("username"));
    
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }
    

Go プロセス内

Go アプリケーションの場合、クラスタ初期化ファイルの 1 行を変更して、Spanner Cassandra Go クライアントを統合する必要があります。そうすると、Cassandra Adapter をアプリケーションに直接リンクできます。

  1. Go アプリケーションの Spanner Cassandra Go クライアントから、アダプタの spanner パッケージをインポートします。
import spanner "github.com/googleapis/go-spanner-cassandra/cassandra/gocql"
  1. gocql.NewCluster ではなく spanner.NewCluster を使用するようクラスタ作成コードを変更し、Spanner データベース URI を指定します。
    import (
    	"fmt"
    	"io"
    	"math"
    	"math/rand/v2"
    	"time"
    
    	spanner "github.com/googleapis/go-spanner-cassandra/cassandra/gocql"
    )
    
    // This sample assumes your spanner database <your_db> contains a table <users>
    // with the following schema:
    //
    // CREATE TABLE users (
    //	id   	 	INT64          OPTIONS (cassandra_type = 'int'),
    //	active    	BOOL           OPTIONS (cassandra_type = 'boolean'),
    //	username  	STRING(MAX)    OPTIONS (cassandra_type = 'text'),
    // ) PRIMARY KEY (id);
    
    func quickStart(databaseURI string, w io.Writer) error {
    	opts := &spanner.Options{
    		DatabaseUri: databaseURI,
    	}
    	cluster := spanner.NewCluster(opts)
    	if cluster == nil {
    		return fmt.Errorf("failed to create cluster")
    	}
    	defer spanner.CloseCluster(cluster)
    
    	// You can still configure your cluster as usual after connecting to your
    	// spanner database
    	cluster.Timeout = 5 * time.Second
    	cluster.Keyspace = "your_db_name"
    
    	session, err := cluster.CreateSession()
    
    	if err != nil {
    		return err
    	}
    
    	randomUserId := rand.IntN(math.MaxInt32)
    	if err = session.Query("INSERT INTO users (id, active, username) VALUES (?, ?, ?)",
    			       randomUserId, true, "John Doe").
    		Exec(); err != nil {
    		return err
    	}
    
    	var id int
    	var active bool
    	var username string
    	if err = session.Query("SELECT id, active, username FROM users WHERE id = ?",
    			       randomUserId).
    		Scan(&id, &active, &username); err != nil {
    		return err
    	}
    	fmt.Fprintf(w, "%d %v %s\n", id, active, username)
    	return nil
    }

Spanner データベースに接続したら、通常どおりクラスタを構成できます。

スタンドアロン

  1. リポジトリのクローンを作成します。
git clone https://github.com/googleapis/go-spanner-cassandra.git
cd go-spanner-cassandra
  1. 必要な -db フラグを指定して cassandra_launcher.go を実行します。
go run cassandra_launcher.go \
-db "projects/my_project/instances/my_instance/databases/my_database"
  1. -db は、Spanner データベースの URI に置き換えます。

Docker

次のコマンドを使用して、Cassandra Adapter を起動します。

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
docker run -d -p 9042:9042 \
-e GOOGLE_APPLICATION_CREDENTIALS \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS}:ro \
gcr.io/cloud-spanner-adapter/cassandra-adapter \
-db DATABASE_URI

Spanner Cassandra Adapter で最もよく使用される起動オプションを以下に列挙します。

  • -db <DatabaseUri>

Spanner データベースの URI(必須)。クライアントが接続する Spanner データベースを指定します。例: projects/YOUR_PROJECT/instances/YOUR_INSTANCE/databases/YOUR_DATABASE

  • -tcp <TCPEndpoint>

クライアント プロキシのリスナー アドレス。これにより、クライアントが受信側の Cassandra クライアント接続をリッスンする TCP エンドポイントが定義されます。デフォルトは localhost:9042 です。

  • -grpc-channels <NumGrpcChannels>

Spanner への接続時に使用する gRPC チャネルの数を指定します。デフォルト: 4

たとえば、次のコマンドは、アプリケーション認証情報を使用してポート 9042 で Cassandra Adapter を起動し、アダプタを projects/my_project/instances/my_instance/databases/my_database データベースに接続します。

export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
docker run -d -p 9042:9042 \
-e GOOGLE_APPLICATION_CREDENTIALS \
-v ${GOOGLE_APPLICATION_CREDENTIALS}:${GOOGLE_APPLICATION_CREDENTIALS}:ro \
gcr.io/cloud-spanner-adapter/cassandra-adapter \
-db projects/my_project/instances/my_instance/databases/my_database

推奨事項

次の推奨事項は、Cassandra Adapter の動作を向上させるのに役立ちます。これらの推奨事項は、Java、特に Java バージョン 4 向けの Cassandra クライアント ドライバを対象としています。

リクエスト タイムアウトを長くする

リクエストのタイムアウトを 5 秒以上にすると、デフォルト値の 2 秒よりも Cassandra Adapter のパフォーマンスが向上します。

# Sample application.conf: increases request timeout to five seconds
datastax-java-driver {
  basic {
    request {
      timeout = 5 seconds
    }
  }
}

接続プールを調整する

最大接続数と接続またはホストあたりの最大同時リクエストのデフォルト構成は、開発、テスト、小規模な本番環境またはステージング環境に適しています。ただし、Cassandra クラスタ内のノードプールとは異なり、Cassandra Adapter は単一ノードとしてマスカレードするため、これらの値を増やすことをおすすめします。

これらの値を増やすと、クライアントと Cassandra インターフェース間の同時接続数を増やすことが可能です。これにより、負荷が高いときに接続プールが枯渇することを防止できます。

# Sample application.conf: increases maximum number of requests that can be
# executed concurrently on a connection
advanced.connection {
  max-requests-per-connection = 32000
  pool {
    local.size = 10
  }
}

gRPC チャネルを調整する

gRPC チャネルは、Spanner クライアントが通信に使用します。1 つの gRPC チャネルは、TCP 接続とほぼ同等です。1 つの gRPC チャネルで同時に処理できるリクエストは最大 100 件です。つまり、アプリケーションが実行する同時リクエスト数を 100 で割った数以上の gRPC チャネルが必要になります。

トークン認識ルーティングを無効にする

トークン対応ロード バランシングを使用するドライバでは、Cassandra Adapter を使用するときに警告が表示されないことや、機能しないことがあります。Cassandra Adapter は単一ノードとしてマスカレードするため、クラスタ内にレプリケーション係数以上のノードがあることを想定するトークン対応ドライバでは、Cassandra Adapter が適切に機能しない場合があります。ドライバによっては、警告(無視できます)を表示してラウンド ロビン バランシング ポリシーにフォールバックすることも、エラーで失敗することもあります。エラーで失敗するドライバの場合は、トークン対応を無効にするか、ラウンドロビン ロード バランシング ポリシーを構成する必要があります。

# Sample application.conf: disables token-aware routing
metadata {
  token-map {
    enabled = false
  }
}

プロトコル バージョンを V4 に固定する

Cassandra Adapter は、CQL Binary v4 ワイヤ プロトコルに準拠したオープンソースの Apache Cassandra クライアント ドライバと互換性があります。PROTOCOL_VERSIONV4 に固定してください。固定しないと、接続エラーが表示されることがあります。

# Sample application.conf: overrides protocol version to V4
datastax-java-driver {
  advanced.protocol.version = V4
}

次のステップ