用戶端程式庫程式碼範例

本頁提供 Memorystore for Valkey 執行個體的程式碼範例,包括已啟用叢集模式和已停用叢集模式

已啟用叢集模式的程式碼範例

Memorystore for Valkey 與所有 Memorystore for Redis Cluster 用戶端程式碼範例相容:

關於 Valkey GLIDE

Valkey General Language Independent Driver for the Enterprise (GLIDE) 是開放原始碼的用戶端程式庫,支援所有 Valkey 指令。

您可以使用 Valkey GLIDE,將應用程式連線至 Memorystore for Valkey 執行個體。Valkey GLIDE 的設計宗旨是提供高可用性,並確保可靠性及最佳化效能。

為確保應用程式開發和作業的一致性,Valkey GLIDE 是使用以 Rust 編寫的核心驅動程式架構,以及語言專屬的擴充功能實作而成。這項設計可確保各語言版本的功能一致,並降低複雜度。

Valkey GLIDE 支援 Valkey 7.2 和 8.0 版。這項功能支援下列語言:

其他 Valkey OSS 用戶端程式庫

除了 Valkey GLIDE,Memorystore for Valkey 也與下列 Valkey OSS 用戶端程式庫相容:

系統和服務

您可以使用 Spring BootPostgreSQL 和 Memorystore for Valkey,建立下列系統和服務:

  • 工作階段管理系統:工作階段管理是現代網路應用程式的重要環節,可確保使用者互動在多項要求中保持一致且安全。應用程式可透過快取層有效管理使用者工作階段,同時降低資料庫負載並確保可擴充性。
  • 可擴充的排行榜系統:排行榜是應用程式中顯示排名資料的實用方式。使用快取層可減少資料庫負載,同時提供即時排行榜排名。
  • 高效能快取服務:現代應用程式必須大規模提供快速且操作順暢的使用者體驗。建構這項快取服務後,您就能減少延遲和資料庫負載。

已停用叢集模式的程式碼範例

Memorystore for Valkey Cluster Mode Disabled 與啟用叢集模式的程式碼範例中列出的所有 Redis 和 Valkey OSS 用戶端程式庫相容。

在 Memorystore for Valkey 中使用「已停用叢集模式」執行個體時,請完成下列動作:

  • 請使用 Redis 或 Valkey 用戶端物件,而非程式庫提供的 RedisClusterValkeyCluster 用戶端物件。
  • 使用主要端點 IP 位址建立寫入器用戶端。
  • 使用讀取器端點 IP 位址建立讀取器用戶端。

redis-py

建議使用 redis-py 5.1 以上版本。

import redis
primaryEndpoint = PRIMARY_ENDPOINT_IP
readerEndpoint = READER_ENDPOINT_IP

primary_client = redis.Redis(host=primaryEndpoint, port=6379, db=0, decode_responses=True)
reader_client = redis.Redis(host=readerEndpoint, port=6379, db=0, decode_responses=True)

primary_client.set("key","value")
print(reader_client.get("key"))

go-redis

建議使用 go-redis 9.11.0 以上版本。

package main

import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)

func main() {
primary_endpoint := PRIMARY_ENDPOINT_IP
reader_endpoint := READER_ENDPOINT_IP

primary_client := redis.NewClient(&redis.Options{
  Addr:     primary_endpoint,
  Password: "", // no password set
  DB:       0,  // use default DB
})

reader_client := redis.NewClient(&redis.Options{
  Addr:     reader_endpoint,
  Password: "", // no password set
  DB:       0,  // use default DB
})

ctx := context.Background()

err := primary_client.Set(ctx, "foo", "bar", 0).Err()
if err != nil {
  panic(err)
}

val, err := reader_client.Get(ctx, "foo").Result()
if err != nil {
  panic(err)
}
fmt.Println("foo", val)
}

絕地武士

建議使用 Jedis 4.4.0 以上版本。


package org.example;

import java.io.*;
import java.time.LocalDateTime;
import java.lang.Thread;
import java.util.HashMap;
import java.util.Map;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Main
{
public static void main( String[] args )
{
  primaryEndpoint = PRIMARY_ENDPOINT_IP

  JedisPool pool = new JedisPool(primaryEndpoint, 6379);

  try (Jedis jedis = pool.getResource()) {
    jedis.set("foo", "bar");
    System.out.println(jedis.get("foo")); // prints bar

    Map hash = new HashMap<>();;
    hash.put("name", "John");
    hash.put("surname", "Smith");
    hash.put("company", "Redis");
    hash.put("age", "29");
    jedis.hset("user-session:123", hash);
    System.out.println(jedis.hgetAll("user-session:123"));
    // Prints: {name=John, surname=Smith, company=Redis, age=29}
  } catch (Exception e) {
    System.out.println("Error setting or getting  key: " + e.getMessage());
  }
}
}

Node.js

建議使用 Node.js 24.4.1 以上版本。

import { createClient } from 'redis';
import * as fs from 'fs';

const primaryEndpoint = PRIMARY_ENDPOINT_IP

const primary_endpoint_url ='redis://primaryEndpoint:6379'

const client = createClient({
url: primary_endpoint_url
});

await client.connect();
await client.set(key, value);
const retval = await client.get(key);
console.log(retval)

同時使用 IAM 驗證和傳輸中加密的程式碼範例

本節提供範例,說明如何使用各種用戶端程式庫,透過 IAM 驗證傳輸中加密,驗證及連線至 Memorystore for Valkey 執行個體。

redis-py

建議使用 redis-py 5.1 以上版本。

from google.cloud import iam_credentials_v1
from redis.backoff import ConstantBackoff
from redis.retry import Retry
from redis.exceptions import (
ConnectionError,
AuthenticationWrongNumberOfArgsError,
AuthenticationError
)
from redis.utils import (str_if_bytes)

import redis

service_account="projects/-/serviceAccounts/<TO-DO-1: your service account that used to authenticate to Valkey>""

host=<TO-DO-2: your Redis Cluster discovery endpoint ip>
ssl_ca_certs=<TO-DO-3, your trusted server ca file name>

def generate_access_token():
  # Create a client
  client = iam_credentials_v1.IAMCredentialsClient()

  # Initialize request argument(s)
  request = iam_credentials_v1.GenerateAccessTokenRequest(
      name=service_account,
      scope=['https://www.googleapis.com/auth/cloud-platform'],
  )

  # Make the request
  response = client.generate_access_token(request=request)
  print(str(response.access_token))

  # Handle the response
  return str(response.access_token)

class ValkeyTokenProvider(redis.CredentialProvider):

    # Generated IAM tokens are valid for 15 minutes
    def get_credentials(self):
        token= generate_access_token()
        return "default",token

creds_provider = ValkeyTokenProvider()
client = redis.Redis(host=host, port=6379, credential_provider=creds_provider, ssl=True, ssl_ca_certs=caFilePath)
client.set('foo',"bar")
print(client.get('foo'))

Go

建議使用 Go 1.24.5 以上版本。

package main

import (
  "context"
  "crypto/tls"
  "crypto/x509"
  "flag"
  "fmt"
  "io/ioutil"
  "log"
  "sync"
  "time"

  credentials "google.golang.org/genproto/googleapis/iam/credentials/v1"

  "github.com/golang/protobuf/ptypes"
  "github.com/redis/go-redis/v9"
  "google.golang.org/api/option"
  gtransport "google.golang.org/api/transport/grpc"
)

var (
  svcAccount               = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", "service account email")
  lifetime                 = flag.Duration("d", time.Hour, "lifetime of token")
  refreshDuration          = flag.Duration("r", 5*time.Minute, "token refresh duration")
  checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval")
  lastRefreshInstant       = time.Time{}
  errLastSeen              = error(nil)
  token                    = ""
  mu                       = sync.RWMutex{}
  err                      = error(nil)
)

func retrieveToken() (string, error) {
  ctx := context.Background()
  conn, err := gtransport.Dial(ctx,
    option.WithEndpoint("iamcredentials.googleapis.com:443"),
    option.WithScopes("https://www.googleapis.com/auth/cloud-platform"))

  if err != nil {
    log.Printf("Failed to dial API, error: %v", err)
    return token, err
  }
  client := credentials.NewIAMCredentialsClient(conn)
  req := credentials.GenerateAccessTokenRequest{
    Name:     *svcAccount,
    Scope:    []string{"https://www.googleapis.com/auth/cloud-platform"},
    Lifetime: ptypes.DurationProto(*lifetime),
  }
  rsp, err := client.GenerateAccessToken(ctx, &req)
  if err != nil {
    log.Printf("Failed to call GenerateAccessToken with request: %v, error: %v", req, err)
    return token, err
  }
  return rsp.AccessToken, nil
}

func refreshTokenLoop() {
  if *refreshDuration > *lifetime {
    log.Fatal("Refresh should not happen after token is already expired.")
  }
  for {
    mu.RLock()
    lastRefreshTime := lastRefreshInstant
    mu.RUnlock()
    if time.Now().After(lastRefreshTime.Add(*refreshDuration)) {
      var err error
      retrievedToken, err := retrieveToken()
      mu.Lock()
      token = retrievedToken
      if err != nil {
        errLastSeen = err
      } else {
        lastRefreshInstant = time.Now()
      }
      mu.Unlock()
    }
    time.Sleep(*checkTokenExpiryInterval)
  }
}

func retrieveTokenFunc() (string, string) {
  mu.RLock()
  defer mu.RUnlock()
  if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) {
    log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen)
    return "", ""
  }
  username := "default"
  password := token
  return username, password
}

func main() {
  caFilePath := CA_FILE_PATH
  clusterDicEpAddr := PRIMARY_ENDPOINT_IP_ADDRESS_AND_PORT
  caCert, err := ioutil.ReadFile(caFilePath)
  if err != nil {
    log.Fatal(err)
  }
  caCertPool := x509.NewCertPool()
  caCertPool.AppendCertsFromPEM(caCert)
  token, err = retrieveToken()
  if err != nil {
    log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err)
  }
  token, err = retrieveToken()
  fmt.Printf("token : %v", token)
  if err != nil {
    log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err)
  }
  lastRefreshInstant = time.Now()
  go refreshTokenLoop()

  client := redis.NewClient(&redis.Options{
    Addr:                clusterDicEpAddr,
    CredentialsProvider: retrieveTokenFunc,
    TLSConfig: &tls.Config{
      RootCAs: caCertPool,
    },
  })

  ctx := context.Background()
  err = client.Set(ctx, "foo", "bar", 0).Err()
  if err != nil {
    log.Fatal(err)
  }
  val, err := client.Get(ctx, "foo").Result()
  if err != nil {
    log.Fatal(err)
  }
  fmt.Printf("\nGot the value for key: key, which is %s \n", val)
}