diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 76691c0e5..d15c8ba9b 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -259,12 +259,34 @@ func (s *Redis) BitPosCtx(ctx context.Context, key string, bit, start, end int64 } // Blpop uses passed in redis connection to execute blocking queries. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid +// exhausting the connection pool. Blocking commands hold connections for extended periods and should +// not share the regular connection pool. +// +// Example usage: +// +// node, err := redis.CreateBlockingNode(rds) +// if err != nil { +// // handle error +// } +// defer node.Close() +// +// value, err := rds.Blpop(node, "mylist") +// if err != nil { +// // handle error +// } +// // Doesn't benefit from pooling redis connections of blocking queries func (s *Redis) Blpop(node RedisNode, key string) (string, error) { return s.BlpopCtx(context.Background(), node, key) } // BlpopCtx uses passed in redis connection to execute blocking queries. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode. +// See Blpop for usage examples. +// // Doesn't benefit from pooling redis connections of blocking queries func (s *Redis) BlpopCtx(ctx context.Context, node RedisNode, key string) (string, error) { return s.BlpopWithTimeoutCtx(ctx, node, blockingQueryTimeout, key) @@ -272,12 +294,18 @@ func (s *Redis) BlpopCtx(ctx context.Context, node RedisNode, key string) (strin // BlpopEx uses passed in redis connection to execute blpop command. // The difference against Blpop is that this method returns a bool to indicate success. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode. +// See Blpop for usage examples. func (s *Redis) BlpopEx(node RedisNode, key string) (string, bool, error) { return s.BlpopExCtx(context.Background(), node, key) } // BlpopExCtx uses passed in redis connection to execute blpop command. // The difference against Blpop is that this method returns a bool to indicate success. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode. +// See Blpop for usage examples. func (s *Redis) BlpopExCtx(ctx context.Context, node RedisNode, key string) (string, bool, error) { if node == nil { return "", false, ErrNilNode @@ -297,12 +325,18 @@ func (s *Redis) BlpopExCtx(ctx context.Context, node RedisNode, key string) (str // BlpopWithTimeout uses passed in redis connection to execute blpop command. // Control blocking query timeout +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode. +// See Blpop for usage examples. func (s *Redis) BlpopWithTimeout(node RedisNode, timeout time.Duration, key string) (string, error) { return s.BlpopWithTimeoutCtx(context.Background(), node, timeout, key) } // BlpopWithTimeoutCtx uses passed in redis connection to execute blpop command. // Control blocking query timeout +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode. +// See Blpop for usage examples. func (s *Redis) BlpopWithTimeoutCtx(ctx context.Context, node RedisNode, timeout time.Duration, key string) (string, error) { if node == nil { @@ -1840,6 +1874,29 @@ func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoSt // XReadGroup reads messages from Redis streams as part of a consumer group. // It allows for distributed processing of stream messages with automatic message delivery semantics. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid +// exhausting the connection pool. Blocking commands hold connections for extended periods and should +// not share the regular connection pool. +// +// Example usage: +// +// node, err := redis.CreateBlockingNode(rds) +// if err != nil { +// // handle error +// } +// defer node.Close() +// +// streams, err := rds.XReadGroup( +// node, // RedisNode created with CreateBlockingNode +// "mygroup", // consumer group name +// "consumer1", // consumer ID +// 10, // max number of messages to read +// 5*time.Second, // block duration +// false, // noAck flag +// "mystream", // stream name +// ) +// // Doesn't benefit from pooling redis connections of blocking queries. func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { @@ -1847,6 +1904,10 @@ func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, coun } // XReadGroupCtx is the context-aware version of XReadGroup. +// +// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid +// exhausting the connection pool. See XReadGroup for usage examples. +// // Doesn't benefit from pooling redis connections of blocking queries. func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { diff --git a/core/stores/redis/redisblockingnode.go b/core/stores/redis/redisblockingnode.go index 1152869b7..b590234ac 100644 --- a/core/stores/redis/redisblockingnode.go +++ b/core/stores/redis/redisblockingnode.go @@ -13,7 +13,37 @@ type ClosableNode interface { Close() } -// CreateBlockingNode returns a ClosableNode. +// CreateBlockingNode creates a dedicated RedisNode for blocking operations. +// +// Blocking Redis commands (like BLPOP, BRPOP, XREADGROUP with block parameter) hold connections +// for extended periods while waiting for data. Using them with the regular Redis connection pool +// can exhaust all available connections, causing other operations to fail or timeout. +// +// CreateBlockingNode creates a separate Redis client with a minimal connection pool (size 1) that +// is dedicated to blocking operations. This ensures blocking commands don't interfere with regular +// Redis operations. +// +// Example usage: +// +// rds := redis.MustNewRedis(redis.RedisConf{ +// Host: "localhost:6379", +// Type: redis.NodeType, +// }) +// +// // Create a dedicated node for blocking operations +// node, err := redis.CreateBlockingNode(rds) +// if err != nil { +// // handle error +// } +// defer node.Close() // Important: close the node when done +// +// // Use the node for blocking operations +// value, err := rds.Blpop(node, "mylist") +// if err != nil { +// // handle error +// } +// +// The returned ClosableNode must be closed when no longer needed to release resources. func CreateBlockingNode(r *Redis) (ClosableNode, error) { timeout := readWriteTimeout + blockingQueryTimeout