From 569c00ad09eddcc9e5fd8fe5e95d98dfedc7373f Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 3 Aug 2025 14:58:09 +0800 Subject: [PATCH] chore: refactor redis stream (#5048) Signed-off-by: Kevin Wan --- core/stores/redis/redis.go | 262 ++++++++++++++++++-------------- core/stores/redis/redis_test.go | 55 ++++++- 2 files changed, 197 insertions(+), 120 deletions(-) diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index beca0e93b..ddc073ed0 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -65,7 +65,6 @@ type ( // RedisNode interface represents a redis node. RedisNode interface { red.Cmdable - red.BitMapCmdable } // GeoLocation is used with GeoAdd to add geospatial location. @@ -1285,10 +1284,12 @@ func (s *Redis) RpushCtx(ctx context.Context, key string, values ...any) (int, e return int(v), nil } +// RPopLPush atomically removes the last element from source list and prepends it to destination list. func (s *Redis) RPopLPush(source string, destination string) (string, error) { return s.RPopLPushCtx(context.Background(), source, destination) } +// RPopLPushCtx is the context-aware version of RPopLPush. func (s *Redis) RPopLPushCtx(ctx context.Context, source string, destination string) (string, error) { conn, err := getRedis(s) if err != nil { @@ -1695,14 +1696,17 @@ func (s *Redis) TtlCtx(ctx context.Context, key string) (int, error) { return int(duration), nil } +// TxPipeline returns a Redis transaction pipeline for executing multiple commands atomically. func (s *Redis) TxPipeline() (pipe Pipeliner, err error) { conn, err := getRedis(s) if err != nil { return nil, err } + return conn.TxPipeline(), nil } +// Unlink is similar to Del but removes keys asynchronously in a separate thread. func (s *Redis) Unlink(keys ...string) (int64, error) { return s.UnlinkCtx(context.Background(), keys...) } @@ -1712,9 +1716,154 @@ func (s *Redis) UnlinkCtx(ctx context.Context, keys ...string) (int64, error) { if err != nil { return 0, err } + return conn.Unlink(ctx, keys...).Result() } +// XAck acknowledges one or more messages in a Redis stream consumer group. +// It marks the specified messages as successfully processed. +func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) { + return s.XAckCtx(context.Background(), stream, group, ids...) +} + +// XAckCtx is the context-aware version of XAck. +func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) { + conn, err := getRedis(s) + if err != nil { + return 0, err + } + + return conn.XAck(ctx, stream, group, ids...).Result() +} + +// XAdd adds a new entry to a Redis stream with the specified ID and field-value pairs. +// If noMkStream is true, the command will fail if the stream doesn't exist. +func (s *Redis) XAdd(stream string, noMkStream bool, id string, values any) (string, error) { + return s.XAddCtx(context.Background(), stream, noMkStream, id, values) +} + +// XAddCtx is the context-aware version of XAdd. +func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values any) ( + string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + + return conn.XAdd(ctx, &red.XAddArgs{ + Stream: stream, + ID: id, + Values: values, + NoMkStream: noMkStream, + }).Result() +} + +// XGroupCreateMkStream creates a consumer group for a Redis stream. +// If the stream doesn't exist, it will be created automatically. +func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) { + return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start) +} + +// XGroupCreateMkStreamCtx is the context-aware version of XGroupCreateMkStream. +func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, + start string) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + + return conn.XGroupCreateMkStream(ctx, stream, group, start).Result() +} + +// XGroupCreate creates a consumer group for a Redis stream. +// The stream must already exist, otherwise the command will fail. +func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) { + return s.XGroupCreateCtx(context.Background(), stream, group, start) +} + +// XGroupCreateCtx is the context-aware version of XGroupCreate. +func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) ( + string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + + return conn.XGroupCreate(ctx, stream, group, start).Result() +} + +// XInfoConsumers returns information about consumers in a Redis stream consumer group. +func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) { + return s.XInfoConsumersCtx(context.Background(), stream, group) +} + +// XInfoConsumersCtx is the context-aware version of XInfoConsumers. +func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ( + []red.XInfoConsumer, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + + return conn.XInfoConsumers(ctx, stream, group).Result() +} + +// XInfoGroups returns information about consumer groups for a Redis stream. +func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) { + return s.XInfoGroupsCtx(context.Background(), stream) +} + +// XInfoGroupsCtx is the context-aware version of XInfoGroups. +func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + + return conn.XInfoGroups(ctx, stream).Result() +} + +// XInfoStream returns general information about a Redis stream. +func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) { + return s.XInfoStreamCtx(context.Background(), stream) +} + +// XInfoStreamCtx is the context-aware version of XInfoStream. +func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + + return conn.XInfoStream(ctx, stream).Result() +} + +// XReadGroup reads messages from Redis streams as part of a consumer group. +// It allows for distributed processing of stream messages with automatic message delivery semantics. +// Doesn't benefit from pooling redis connections of blocking queries. +func (s *Redis) XReadGroup(node RedisNode, group string, consumerId string, count int64, + block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { + return s.XReadGroupCtx(context.Background(), node, group, consumerId, count, block, noAck, streams...) +} + +// XReadGroupCtx is the context-aware version of XReadGroup. +// Doesn't benefit from pooling redis connections of blocking queries. +func (s *Redis) XReadGroupCtx(ctx context.Context, node RedisNode, group string, consumerId string, + count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { + if node == nil { + return nil, ErrNilNode + } + + return node.XReadGroup(ctx, &red.XReadGroupArgs{ + Group: group, + Consumer: consumerId, + Count: count, + Block: block, + NoAck: noAck, + Streams: streams, + }).Result() +} + // Zadd is the implementation of redis zadd command. func (s *Redis) Zadd(key string, score int64, value string) (bool, error) { return s.ZaddCtx(context.Background(), key, score, value) @@ -2402,117 +2551,6 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore) return conn.ZUnionStore(ctx, dest, store).Result() } -func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) { - return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start) -} - -func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) { - conn, err := getRedis(s) - if err != nil { - return "", err - } - return conn.XGroupCreateMkStream(ctx, stream, group, start).Result() -} - -func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) { - return s.XGroupCreateCtx(context.Background(), stream, group, start) -} - -func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) { - conn, err := getRedis(s) - if err != nil { - return "", err - } - return conn.XGroupCreate(ctx, stream, group, start).Result() -} - -func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) { - return s.XInfoConsumersCtx(context.Background(), stream, group) -} - -func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) { - conn, err := getRedis(s) - if err != nil { - return nil, err - } - return conn.XInfoConsumers(ctx, stream, group).Result() -} - -func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) { - return s.XInfoGroupsCtx(context.Background(), stream) -} - -func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) { - conn, err := getRedis(s) - if err != nil { - return nil, err - } - return conn.XInfoGroups(ctx, stream).Result() -} - -func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) { - return s.XInfoStreamCtx(context.Background(), stream) -} - -func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) { - conn, err := getRedis(s) - if err != nil { - return nil, err - } - return conn.XInfoStream(ctx, stream).Result() -} - -func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) { - return s.XAddCtx(context.Background(), stream, noMkStream, id, values) -} - -func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) { - conn, err := getRedis(s) - if err != nil { - return "", err - } - return conn.XAdd(ctx, &red.XAddArgs{ - Stream: stream, - ID: id, - Values: values, - NoMkStream: noMkStream, - }).Result() -} - -func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) { - return s.XAckCtx(context.Background(), stream, group, ids...) -} - -func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) { - conn, err := getRedis(s) - if err != nil { - return 0, err - } - return conn.XAck(ctx, stream, group, ids...).Result() -} - -/** - * streams: list of streams and ids, e.g. stream1 stream2 id1 id2 - */ -func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { - return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...) -} - -func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { - conn, err := getRedis(s) - if err != nil { - return nil, err - } - return conn.XReadGroup(ctx, &red.XReadGroupArgs{ - Group: group, - Consumer: consumerId, - Count: count, - Block: block, - NoAck: noAck, - Streams: streams, - }).Result() -} - func (s *Redis) checkConnection(pingTimeout time.Duration) error { conn, err := getRedis(s) if err != nil { diff --git a/core/stores/redis/redis_test.go b/core/stores/redis/redis_test.go index 93674fa7a..be0ad3d9a 100644 --- a/core/stores/redis/redis_test.go +++ b/core/stores/redis/redis_test.go @@ -916,6 +916,11 @@ func TestRedis_Ping(t *testing.T) { ok := client.Ping() assert.True(t, ok) }) + + runOnRedisWithError(t, func(client *Redis) { + ok := client.Ping() + assert.False(t, ok) + }) }) } @@ -2029,6 +2034,16 @@ func TestRedis_WithUserPass(t *testing.T) { err := newRedis(client.Addr, WithUser("any"), WithPass("any")).Ping() assert.NotNil(t, err) }) + + runOnRedisWithAccount(t, "foo", "bar", func(client *Redis) { + err := client.Set("key1", "value1") + assert.Nil(t, err) + _, err = newRedis(client.Addr, badType()).Keys("*") + assert.NotNil(t, err) + keys, err := client.Keys("*") + assert.Nil(t, err) + assert.ElementsMatch(t, []string{"key1"}, keys) + }) } func TestRedis_checkConnection(t *testing.T) { @@ -2057,6 +2072,19 @@ func runOnRedis(t *testing.T, fn func(client *Redis)) { })) } +func runOnRedisWithAccount(t *testing.T, user, pass string, fn func(client *Redis)) { + logx.Disable() + + s := miniredis.RunT(t) + s.RequireUserAuth(user, pass) + fn(MustNewRedis(RedisConf{ + Host: s.Addr(), + Type: NodeType, + User: user, + Pass: pass, + })) +} + func runOnRedisWithError(t *testing.T, fn func(client *Redis)) { logx.Disable() @@ -2203,10 +2231,6 @@ func TestRedisXInfo(t *testing.T) { assert.NotNil(t, err) _, err = newRedis(client.Addr, badType()).XInfoGroups("Source") assert.NotNil(t, err) - _, err = newRedis(client.Addr, badType()).XReadGroup("aa", "consumer", 1, 2000, false, "ss", ">") - assert.NotNil(t, err) - _, err = newRedis(client.Addr, badType()).XInfoConsumers("aa", "bb") - assert.NotNil(t, err) redisCli := newRedis(client.Addr) @@ -2228,7 +2252,10 @@ func TestRedisXInfo(t *testing.T) { assert.Equal(t, int64(1), infoGroups[0].Lag) assert.Equal(t, group, infoGroups[0].Name) - streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">") + node, err := getRedis(redisCli) + assert.NoError(t, err) + redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"}) + streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">") assert.Nil(t, err) assert.Equal(t, 1, len(streamRes)) assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"]) @@ -2236,6 +2263,9 @@ func TestRedisXInfo(t *testing.T) { infoConsumers, err := redisCli.XInfoConsumers(stream, group) assert.Nil(t, err) assert.Equal(t, 1, len(infoConsumers)) + + _, err = newRedis(client.Addr, badType()).XInfoConsumers(stream, group) + assert.NotNil(t, err) }) } @@ -2257,12 +2287,19 @@ func TestRedisXReadGroup(t *testing.T) { _, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"}) assert.Nil(t, err) - streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">") + node, err := getRedis(redisCli) + assert.NoError(t, err) + redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"}) + _, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, ">") + assert.Error(t, err) + streamRes, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, ">") assert.Nil(t, err) assert.Equal(t, 1, len(streamRes)) assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"]) - streamRes1, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0") + _, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0") + assert.Error(t, err) + streamRes1, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0") assert.Nil(t, err) assert.Equal(t, 1, len(streamRes1)) assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"]) @@ -2270,7 +2307,9 @@ func TestRedisXReadGroup(t *testing.T) { _, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID) assert.Nil(t, err) - streamRes2, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0") + _, err = redisCli.XReadGroup(nil, group, "consumer", 1, 2000, false, stream, "0") + assert.Error(t, err) + streamRes2, err := redisCli.XReadGroup(node, group, "consumer", 1, 2000, false, stream, "0") assert.Nil(t, err) assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty") assert.Equal(t, 0, len(streamRes2[0].Messages))