本頁提供 Memorystore for Valkey 執行個體的程式碼範例,包括已啟用叢集模式和已停用叢集模式。
已啟用叢集模式的程式碼範例
Memorystore for Valkey 與所有 Memorystore for Redis Cluster 用戶端程式碼範例相容:
- Memorystore for Redis Cluster 用戶端程式庫程式碼範例
- Memorystore for Redis Cluster 用戶端程式庫連線程式碼範例
關於 Valkey GLIDE
Valkey General Language Independent Driver for the Enterprise (GLIDE) 是開放原始碼的用戶端程式庫,支援所有 Valkey 指令。
您可以使用 Valkey GLIDE,將應用程式連線至 Memorystore for Valkey 執行個體。Valkey GLIDE 的設計宗旨是提供高可用性,並確保可靠性及最佳化效能。
為確保應用程式開發和作業的一致性,Valkey GLIDE 是使用以 Rust 編寫的核心驅動程式架構,以及語言專屬的擴充功能實作而成。這項設計可確保各語言版本的功能一致,並降低複雜度。
Valkey GLIDE 支援 Valkey 7.2 和 8.0 版。這項功能支援下列語言:
其他 Valkey OSS 用戶端程式庫
除了 Valkey GLIDE,Memorystore for Valkey 也與下列 Valkey OSS 用戶端程式庫相容:
系統和服務
您可以使用 Spring Boot、PostgreSQL 和 Memorystore for Valkey,建立下列系統和服務:
- 工作階段管理系統:工作階段管理是現代網路應用程式的重要環節,可確保使用者互動在多項要求中保持一致且安全。應用程式可透過快取層有效管理使用者工作階段,同時降低資料庫負載並確保可擴充性。
- 可擴充的排行榜系統:排行榜是應用程式中顯示排名資料的實用方式。使用快取層可減少資料庫負載,同時提供即時排行榜排名。
- 高效能快取服務:現代應用程式必須大規模提供快速且操作順暢的使用者體驗。建構這項快取服務後,您就能減少延遲和資料庫負載。
已停用叢集模式的程式碼範例
Memorystore for Valkey Cluster Mode Disabled 與啟用叢集模式的程式碼範例中列出的所有 Redis 和 Valkey OSS 用戶端程式庫相容。
在 Memorystore for Valkey 中使用「已停用叢集模式」執行個體時,請完成下列動作:
- 請使用 Redis 或 Valkey 用戶端物件,而非程式庫提供的
RedisCluster
或ValkeyCluster
用戶端物件。 - 使用主要端點 IP 位址建立寫入器用戶端。
使用讀取器端點 IP 位址建立讀取器用戶端。
redis-py
建議使用 redis-py 5.1 以上版本。
import redis primaryEndpoint = PRIMARY_ENDPOINT_IP readerEndpoint = READER_ENDPOINT_IP primary_client = redis.Redis(host=primaryEndpoint, port=6379, db=0, decode_responses=True) reader_client = redis.Redis(host=readerEndpoint, port=6379, db=0, decode_responses=True) primary_client.set("key","value") print(reader_client.get("key"))
go-redis
建議使用 go-redis 9.11.0 以上版本。
package main import ( "context" "fmt" "github.com/redis/go-redis/v9" ) func main() { primary_endpoint := PRIMARY_ENDPOINT_IP reader_endpoint := READER_ENDPOINT_IP primary_client := redis.NewClient(&redis.Options{ Addr: primary_endpoint, Password: "", // no password set DB: 0, // use default DB }) reader_client := redis.NewClient(&redis.Options{ Addr: reader_endpoint, Password: "", // no password set DB: 0, // use default DB }) ctx := context.Background() err := primary_client.Set(ctx, "foo", "bar", 0).Err() if err != nil { panic(err) } val, err := reader_client.Get(ctx, "foo").Result() if err != nil { panic(err) } fmt.Println("foo", val) }
絕地武士
建議使用 Jedis 4.4.0 以上版本。
package org.example; import java.io.*; import java.time.LocalDateTime; import java.lang.Thread; import java.util.HashMap; import java.util.Map; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class Main { public static void main( String[] args ) { primaryEndpoint = PRIMARY_ENDPOINT_IP JedisPool pool = new JedisPool(primaryEndpoint, 6379); try (Jedis jedis = pool.getResource()) { jedis.set("foo", "bar"); System.out.println(jedis.get("foo")); // prints bar Maphash = new HashMap<>();; hash.put("name", "John"); hash.put("surname", "Smith"); hash.put("company", "Redis"); hash.put("age", "29"); jedis.hset("user-session:123", hash); System.out.println(jedis.hgetAll("user-session:123")); // Prints: {name=John, surname=Smith, company=Redis, age=29} } catch (Exception e) { System.out.println("Error setting or getting key: " + e.getMessage()); } } }
Node.js
建議使用 Node.js 24.4.1 以上版本。
import { createClient } from 'redis'; import * as fs from 'fs'; const primaryEndpoint = PRIMARY_ENDPOINT_IP const primary_endpoint_url ='redis://primaryEndpoint:6379' const client = createClient({ url: primary_endpoint_url }); await client.connect(); await client.set(key, value); const retval = await client.get(key); console.log(retval)
同時使用 IAM 驗證和傳輸中加密的程式碼範例
本節提供範例,說明如何使用各種用戶端程式庫,透過 IAM 驗證和傳輸中加密,驗證及連線至 Memorystore for Valkey 執行個體。
redis-py
建議使用 redis-py 5.1 以上版本。
from google.cloud import iam_credentials_v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError, AuthenticationWrongNumberOfArgsError, AuthenticationError ) from redis.utils import (str_if_bytes) import redis service_account="projects/-/serviceAccounts/<TO-DO-1: your service account that used to authenticate to Valkey>"" host=<TO-DO-2: your Redis Cluster discovery endpoint ip> ssl_ca_certs=<TO-DO-3, your trusted server ca file name> def generate_access_token(): # Create a client client = iam_credentials_v1.IAMCredentialsClient() # Initialize request argument(s) request = iam_credentials_v1.GenerateAccessTokenRequest( name=service_account, scope=['https://www.googleapis.com/auth/cloud-platform'], ) # Make the request response = client.generate_access_token(request=request) print(str(response.access_token)) # Handle the response return str(response.access_token) class ValkeyTokenProvider(redis.CredentialProvider): # Generated IAM tokens are valid for 15 minutes def get_credentials(self): token= generate_access_token() return "default",token creds_provider = ValkeyTokenProvider() client = redis.Redis(host=host, port=6379, credential_provider=creds_provider, ssl=True, ssl_ca_certs=caFilePath) client.set('foo',"bar") print(client.get('foo'))
Go
建議使用 Go 1.24.5 以上版本。
package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "sync" "time" credentials "google.golang.org/genproto/googleapis/iam/credentials/v1" "github.com/golang/protobuf/ptypes" "github.com/redis/go-redis/v9" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" ) var ( svcAccount = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", "service account email") lifetime = flag.Duration("d", time.Hour, "lifetime of token") refreshDuration = flag.Duration("r", 5*time.Minute, "token refresh duration") checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval") lastRefreshInstant = time.Time{} errLastSeen = error(nil) token = "" mu = sync.RWMutex{} err = error(nil) ) func retrieveToken() (string, error) { ctx := context.Background() conn, err := gtransport.Dial(ctx, option.WithEndpoint("iamcredentials.googleapis.com:443"), option.WithScopes("https://www.googleapis.com/auth/cloud-platform")) if err != nil { log.Printf("Failed to dial API, error: %v", err) return token, err } client := credentials.NewIAMCredentialsClient(conn) req := credentials.GenerateAccessTokenRequest{ Name: *svcAccount, Scope: []string{"https://www.googleapis.com/auth/cloud-platform"}, Lifetime: ptypes.DurationProto(*lifetime), } rsp, err := client.GenerateAccessToken(ctx, &req) if err != nil { log.Printf("Failed to call GenerateAccessToken with request: %v, error: %v", req, err) return token, err } return rsp.AccessToken, nil } func refreshTokenLoop() { if *refreshDuration > *lifetime { log.Fatal("Refresh should not happen after token is already expired.") } for { mu.RLock() lastRefreshTime := lastRefreshInstant mu.RUnlock() if time.Now().After(lastRefreshTime.Add(*refreshDuration)) { var err error retrievedToken, err := retrieveToken() mu.Lock() token = retrievedToken if err != nil { errLastSeen = err } else { lastRefreshInstant = time.Now() } mu.Unlock() } time.Sleep(*checkTokenExpiryInterval) } } func retrieveTokenFunc() (string, string) { mu.RLock() defer mu.RUnlock() if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) { log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen) return "", "" } username := "default" password := token return username, password } func main() { caFilePath := CA_FILE_PATH clusterDicEpAddr := PRIMARY_ENDPOINT_IP_ADDRESS_AND_PORT caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) token, err = retrieveToken() if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } token, err = retrieveToken() fmt.Printf("token : %v", token) if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } lastRefreshInstant = time.Now() go refreshTokenLoop() client := redis.NewClient(&redis.Options{ Addr: clusterDicEpAddr, CredentialsProvider: retrieveTokenFunc, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "foo", "bar", 0).Err() if err != nil { log.Fatal(err) } val, err := client.Get(ctx, "foo").Result() if err != nil { log.Fatal(err) } fmt.Printf("\nGot the value for key: key, which is %s \n", val) }