chore: refactor redis stream (#5048)

Signed-off-by: Kevin Wan <wanjunfeng@gmail.com>
This commit is contained in:
Kevin Wan
2025-08-03 14:58:09 +08:00
committed by GitHub
parent 9da76fbf04
commit 569c00ad09
2 changed files with 197 additions and 120 deletions

View File

@@ -65,7 +65,6 @@ type (
// RedisNode interface represents a redis node.
RedisNode interface {
red.Cmdable
red.BitMapCmdable
}
// GeoLocation is used with GeoAdd to add geospatial location.
@@ -1285,10 +1284,12 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e
return int(v), nil
}
// RPopLPush atomically removes the last element from source list and prepends it to destination list.
func (s *Redis) RPopLPush(source string, destination string) (string, error) {
return s.RPopLPushCtx(context.Background(), source, destination)
}
// RPopLPushCtx is the context-aware version of RPopLPush.
func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) {
conn, err := getRedis(s)
if err != nil {
@@ -1695,14 +1696,17 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) {
return int(duration), nil
}
// TxPipeline returns a Redis transaction pipeline for executing multiple commands atomically.
func (s *Redis) TxPipeline() (pipe Pipeliner, err error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.TxPipeline(), nil
}
// Unlink is similar to Del but removes keys asynchronously in a separate thread.
func (s *Redis) Unlink(keys ...string) (int64, error) {
return s.UnlinkCtx(context.Background(), keys...)
}
@@ -1712,9 +1716,154 @@ func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) {
if err != nil {
return 0, err
}
return conn.Unlink(ctx, keys...).Result()
}
// XAck acknowledges one or more messages in a Redis stream consumer group.
// It marks the specified messages as successfully processed.
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
return s.XAckCtx(context.Background(), stream, group, ids...)
}
// XAckCtx is the context-aware version of XAck.
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
conn, err := getRedis(s)
if err != nil {
return 0, err
}
return conn.XAck(ctx, stream, group, ids...).Result()
}
// XAdd adds a new entry to a Redis stream with the specified ID and field-value pairs.
// If noMkStream is true, the command will fail if the stream doesn't exist.
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values any) (string, error) {
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
}
// XAddCtx is the context-aware version of XAdd.
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values any) (
string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XAdd(ctx, &red.XAddArgs{
Stream: stream,
ID: id,
Values: values,
NoMkStream: noMkStream,
}).Result()
}
// XGroupCreateMkStream creates a consumer group for a Redis stream.
// If the stream doesn't exist, it will be created automatically.
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
}
// XGroupCreateMkStreamCtx is the context-aware version of XGroupCreateMkStream.
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string,
start string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
}
// XGroupCreate creates a consumer group for a Redis stream.
// The stream must already exist, otherwise the command will fail.
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
return s.XGroupCreateCtx(context.Background(), stream, group, start)
}
// XGroupCreateCtx is the context-aware version of XGroupCreate.
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (
string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreate(ctx, stream, group, start).Result()
}
// XInfoConsumers returns information about consumers in a Redis stream consumer group.
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
return s.XInfoConsumersCtx(context.Background(), stream, group)
}
// XInfoConsumersCtx is the context-aware version of XInfoConsumers.
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) (
[]red.XInfoConsumer, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoConsumers(ctx, stream, group).Result()
}
// XInfoGroups returns information about consumer groups for a Redis stream.
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
return s.XInfoGroupsCtx(context.Background(), stream)
}
// XInfoGroupsCtx is the context-aware version of XInfoGroups.
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoGroups(ctx, stream).Result()
}
// XInfoStream returns general information about a Redis stream.
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
return s.XInfoStreamCtx(context.Background(), stream)
}
// XInfoStreamCtx is the context-aware version of XInfoStream.
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoStream(ctx, stream).Result()
}
// XReadGroup reads messages from Redis streams as part of a consumer group.
// It allows for distributed processing of stream messages with automatic message delivery semantics.
// Doesn't benefit from pooling redis connections of blocking queries.
func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64,
block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
return s.XReadGroupCtx(context.Background(), node, group, consumerId, count, block, noAck, streams...)
}
// XReadGroupCtx is the context-aware version of XReadGroup.
// Doesn't benefit from pooling redis connections of blocking queries.
func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string,
count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
if node == nil {
return nil, ErrNilNode
}
return node.XReadGroup(ctx, &red.XReadGroupArgs{
Group: group,
Consumer: consumerId,
Count: count,
Block: block,
NoAck: noAck,
Streams: streams,
}).Result()
}
// Zadd is the implementation of redis zadd command.
func (s *Redis) Zadd(key string, score int64, value string) (bool, error) {
return s.ZaddCtx(context.Background(), key, score, value)
@@ -2402,117 +2551,6 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore)
return conn.ZUnionStore(ctx, dest, store).Result()
}
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
}
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
}
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
return s.XGroupCreateCtx(context.Background(), stream, group, start)
}
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreate(ctx, stream, group, start).Result()
}
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
return s.XInfoConsumersCtx(context.Background(), stream, group)
}
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoConsumers(ctx, stream, group).Result()
}
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
return s.XInfoGroupsCtx(context.Background(), stream)
}
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoGroups(ctx, stream).Result()
}
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
return s.XInfoStreamCtx(context.Background(), stream)
}
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoStream(ctx, stream).Result()
}
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) {
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
}
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XAdd(ctx, &red.XAddArgs{
Stream: stream,
ID: id,
Values: values,
NoMkStream: noMkStream,
}).Result()
}
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
return s.XAckCtx(context.Background(), stream, group, ids...)
}
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
conn, err := getRedis(s)
if err != nil {
return 0, err
}
return conn.XAck(ctx, stream, group, ids...).Result()
}
/**
* streams: list of streams and ids, e.g. stream1 stream2 id1 id2
*/
func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...)
}
func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XReadGroup(ctx, &red.XReadGroupArgs{
Group: group,
Consumer: consumerId,
Count: count,
Block: block,
NoAck: noAck,
Streams: streams,
}).Result()
}
func (s *Redis) checkConnection(pingTimeout time.Duration) error {
conn, err := getRedis(s)
if err != nil {

View File

@@ -916,6 +916,11 @@ func TestRedis_Ping(t *testing.T) {
ok := client.Ping()
assert.True(t, ok)
})
runOnRedisWithError(t, func(client *Redis) {
ok := client.Ping()
assert.False(t, ok)
})
})
}
@@ -2029,6 +2034,16 @@ func TestRedis_WithUserPass(t *testing.T) {
err := newRedis(client.Addr, WithUser("any"), WithPass("any")).Ping()
assert.NotNil(t, err)
})
runOnRedisWithAccount(t, "foo", "bar", func(client *Redis) {
err := client.Set("key1", "value1")
assert.Nil(t, err)
_, err = newRedis(client.Addr, badType()).Keys("*")
assert.NotNil(t, err)
keys, err := client.Keys("*")
assert.Nil(t, err)
assert.ElementsMatch(t, []string{"key1"}, keys)
})
}
func TestRedis_checkConnection(t *testing.T) {
@@ -2057,6 +2072,19 @@ func runOnRedis(t *testing.T, fn func(client *Redis)) {
}))
}
func runOnRedisWithAccount(t *testing.T, user, pass string, fn func(client *Redis)) {
logx.Disable()
s := miniredis.RunT(t)
s.RequireUserAuth(user, pass)
fn(MustNewRedis(RedisConf{
Host: s.Addr(),
Type: NodeType,
User: user,
Pass: pass,
}))
}
func runOnRedisWithError(t *testing.T, fn func(client *Redis)) {
logx.Disable()
@@ -2203,10 +2231,6 @@ func TestRedisXInfo(t *testing.T) {
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XInfoGroups("Source")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XReadGroup("aa", "consumer", 1, 2000, false, "ss", ">")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XInfoConsumers("aa", "bb")
assert.NotNil(t, err)
redisCli := newRedis(client.Addr)
@@ -2228,7 +2252,10 @@ func TestRedisXInfo(t *testing.T) {
assert.Equal(t, int64(1), infoGroups[0].Lag)
assert.Equal(t, group, infoGroups[0].Name)
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
node, err := getRedis(redisCli)
assert.NoError(t, err)
redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes))
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
@@ -2236,6 +2263,9 @@ func TestRedisXInfo(t *testing.T) {
infoConsumers, err := redisCli.XInfoConsumers(stream, group)
assert.Nil(t, err)
assert.Equal(t, 1, len(infoConsumers))
_, err = newRedis(client.Addr, badType()).XInfoConsumers(stream, group)
assert.NotNil(t, err)
})
}
@@ -2257,12 +2287,19 @@ func TestRedisXReadGroup(t *testing.T) {
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
assert.Nil(t, err)
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
node, err := getRedis(redisCli)
assert.NoError(t, err)
redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, ">")
assert.Error(t, err)
streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes))
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
streamRes1, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0")
assert.Error(t, err)
streamRes1, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes1))
assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"])
@@ -2270,7 +2307,9 @@ func TestRedisXReadGroup(t *testing.T) {
_, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID)
assert.Nil(t, err)
streamRes2, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
_, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0")
assert.Error(t, err)
streamRes2, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0")
assert.Nil(t, err)
assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty")
assert.Equal(t, 0, len(streamRes2[0].Messages))