mirror of
https://github.com/zeromicro/go-zero.git
synced 2026-05-14 10:20:00 +08:00
docs: Add comprehensive documentation for blocking Redis operations (XReadGroup, Blpop) (#5221)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: kevwan <1918356+kevwan@users.noreply.github.com> Co-authored-by: Kevin Wan <wanjunfeng@gmail.com>
This commit is contained in:
@@ -259,12 +259,34 @@ func (s *Redis) BitPosCtx(ctx context.Context, key string, bit, start, end int64
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Blpop uses passed in redis connection to execute blocking queries.
|
// Blpop uses passed in redis connection to execute blocking queries.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid
|
||||||
|
// exhausting the connection pool. Blocking commands hold connections for extended periods and should
|
||||||
|
// not share the regular connection pool.
|
||||||
|
//
|
||||||
|
// Example usage:
|
||||||
|
//
|
||||||
|
// node, err := redis.CreateBlockingNode(rds)
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error
|
||||||
|
// }
|
||||||
|
// defer node.Close()
|
||||||
|
//
|
||||||
|
// value, err := rds.Blpop(node, "mylist")
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error
|
||||||
|
// }
|
||||||
|
//
|
||||||
// Doesn't benefit from pooling redis connections of blocking queries
|
// Doesn't benefit from pooling redis connections of blocking queries
|
||||||
func (s *Redis) Blpop(node RedisNode, key string) (string, error) {
|
func (s *Redis) Blpop(node RedisNode, key string) (string, error) {
|
||||||
return s.BlpopCtx(context.Background(), node, key)
|
return s.BlpopCtx(context.Background(), node, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlpopCtx uses passed in redis connection to execute blocking queries.
|
// BlpopCtx uses passed in redis connection to execute blocking queries.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode.
|
||||||
|
// See Blpop for usage examples.
|
||||||
|
//
|
||||||
// Doesn't benefit from pooling redis connections of blocking queries
|
// Doesn't benefit from pooling redis connections of blocking queries
|
||||||
func (s *Redis) BlpopCtx(ctx context.Context, node RedisNode, key string) (string, error) {
|
func (s *Redis) BlpopCtx(ctx context.Context, node RedisNode, key string) (string, error) {
|
||||||
return s.BlpopWithTimeoutCtx(ctx, node, blockingQueryTimeout, key)
|
return s.BlpopWithTimeoutCtx(ctx, node, blockingQueryTimeout, key)
|
||||||
@@ -272,12 +294,18 @@ func (s *Redis) BlpopCtx(ctx context.Context, node RedisNode, key string) (strin
|
|||||||
|
|
||||||
// BlpopEx uses passed in redis connection to execute blpop command.
|
// BlpopEx uses passed in redis connection to execute blpop command.
|
||||||
// The difference against Blpop is that this method returns a bool to indicate success.
|
// The difference against Blpop is that this method returns a bool to indicate success.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode.
|
||||||
|
// See Blpop for usage examples.
|
||||||
func (s *Redis) BlpopEx(node RedisNode, key string) (string, bool, error) {
|
func (s *Redis) BlpopEx(node RedisNode, key string) (string, bool, error) {
|
||||||
return s.BlpopExCtx(context.Background(), node, key)
|
return s.BlpopExCtx(context.Background(), node, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlpopExCtx uses passed in redis connection to execute blpop command.
|
// BlpopExCtx uses passed in redis connection to execute blpop command.
|
||||||
// The difference against Blpop is that this method returns a bool to indicate success.
|
// The difference against Blpop is that this method returns a bool to indicate success.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode.
|
||||||
|
// See Blpop for usage examples.
|
||||||
func (s *Redis) BlpopExCtx(ctx context.Context, node RedisNode, key string) (string, bool, error) {
|
func (s *Redis) BlpopExCtx(ctx context.Context, node RedisNode, key string) (string, bool, error) {
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return "", false, ErrNilNode
|
return "", false, ErrNilNode
|
||||||
@@ -297,12 +325,18 @@ func (s *Redis) BlpopExCtx(ctx context.Context, node RedisNode, key string) (str
|
|||||||
|
|
||||||
// BlpopWithTimeout uses passed in redis connection to execute blpop command.
|
// BlpopWithTimeout uses passed in redis connection to execute blpop command.
|
||||||
// Control blocking query timeout
|
// Control blocking query timeout
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode.
|
||||||
|
// See Blpop for usage examples.
|
||||||
func (s *Redis) BlpopWithTimeout(node RedisNode, timeout time.Duration, key string) (string, error) {
|
func (s *Redis) BlpopWithTimeout(node RedisNode, timeout time.Duration, key string) (string, error) {
|
||||||
return s.BlpopWithTimeoutCtx(context.Background(), node, timeout, key)
|
return s.BlpopWithTimeoutCtx(context.Background(), node, timeout, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlpopWithTimeoutCtx uses passed in redis connection to execute blpop command.
|
// BlpopWithTimeoutCtx uses passed in redis connection to execute blpop command.
|
||||||
// Control blocking query timeout
|
// Control blocking query timeout
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode.
|
||||||
|
// See Blpop for usage examples.
|
||||||
func (s *Redis) BlpopWithTimeoutCtx(ctx context.Context, node RedisNode, timeout time.Duration,
|
func (s *Redis) BlpopWithTimeoutCtx(ctx context.Context, node RedisNode, timeout time.Duration,
|
||||||
key string) (string, error) {
|
key string) (string, error) {
|
||||||
if node == nil {
|
if node == nil {
|
||||||
@@ -1840,6 +1874,29 @@ func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoSt
|
|||||||
|
|
||||||
// XReadGroup reads messages from Redis streams as part of a consumer group.
|
// XReadGroup reads messages from Redis streams as part of a consumer group.
|
||||||
// It allows for distributed processing of stream messages with automatic message delivery semantics.
|
// It allows for distributed processing of stream messages with automatic message delivery semantics.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid
|
||||||
|
// exhausting the connection pool. Blocking commands hold connections for extended periods and should
|
||||||
|
// not share the regular connection pool.
|
||||||
|
//
|
||||||
|
// Example usage:
|
||||||
|
//
|
||||||
|
// node, err := redis.CreateBlockingNode(rds)
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error
|
||||||
|
// }
|
||||||
|
// defer node.Close()
|
||||||
|
//
|
||||||
|
// streams, err := rds.XReadGroup(
|
||||||
|
// node, // RedisNode created with CreateBlockingNode
|
||||||
|
// "mygroup", // consumer group name
|
||||||
|
// "consumer1", // consumer ID
|
||||||
|
// 10, // max number of messages to read
|
||||||
|
// 5*time.Second, // block duration
|
||||||
|
// false, // noAck flag
|
||||||
|
// "mystream", // stream name
|
||||||
|
// )
|
||||||
|
//
|
||||||
// Doesn't benefit from pooling redis connections of blocking queries.
|
// Doesn't benefit from pooling redis connections of blocking queries.
|
||||||
func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64,
|
func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64,
|
||||||
block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
|
block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
|
||||||
@@ -1847,6 +1904,10 @@ func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, coun
|
|||||||
}
|
}
|
||||||
|
|
||||||
// XReadGroupCtx is the context-aware version of XReadGroup.
|
// XReadGroupCtx is the context-aware version of XReadGroup.
|
||||||
|
//
|
||||||
|
// For blocking operations, you must create a dedicated RedisNode using CreateBlockingNode to avoid
|
||||||
|
// exhausting the connection pool. See XReadGroup for usage examples.
|
||||||
|
//
|
||||||
// Doesn't benefit from pooling redis connections of blocking queries.
|
// Doesn't benefit from pooling redis connections of blocking queries.
|
||||||
func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string,
|
func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string,
|
||||||
count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
|
count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
|
||||||
|
|||||||
@@ -13,7 +13,37 @@ type ClosableNode interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBlockingNode returns a ClosableNode.
|
// CreateBlockingNode creates a dedicated RedisNode for blocking operations.
|
||||||
|
//
|
||||||
|
// Blocking Redis commands (like BLPOP, BRPOP, XREADGROUP with block parameter) hold connections
|
||||||
|
// for extended periods while waiting for data. Using them with the regular Redis connection pool
|
||||||
|
// can exhaust all available connections, causing other operations to fail or timeout.
|
||||||
|
//
|
||||||
|
// CreateBlockingNode creates a separate Redis client with a minimal connection pool (size 1) that
|
||||||
|
// is dedicated to blocking operations. This ensures blocking commands don't interfere with regular
|
||||||
|
// Redis operations.
|
||||||
|
//
|
||||||
|
// Example usage:
|
||||||
|
//
|
||||||
|
// rds := redis.MustNewRedis(redis.RedisConf{
|
||||||
|
// Host: "localhost:6379",
|
||||||
|
// Type: redis.NodeType,
|
||||||
|
// })
|
||||||
|
//
|
||||||
|
// // Create a dedicated node for blocking operations
|
||||||
|
// node, err := redis.CreateBlockingNode(rds)
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error
|
||||||
|
// }
|
||||||
|
// defer node.Close() // Important: close the node when done
|
||||||
|
//
|
||||||
|
// // Use the node for blocking operations
|
||||||
|
// value, err := rds.Blpop(node, "mylist")
|
||||||
|
// if err != nil {
|
||||||
|
// // handle error
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// The returned ClosableNode must be closed when no longer needed to release resources.
|
||||||
func CreateBlockingNode(r *Redis) (ClosableNode, error) {
|
func CreateBlockingNode(r *Redis) (ClosableNode, error) {
|
||||||
timeout := readWriteTimeout + blockingQueryTimeout
|
timeout := readWriteTimeout + blockingQueryTimeout
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user