From 9da76fbf04c6d9c86824d07ccc9f17737014e5d7 Mon Sep 17 00:00:00 2001 From: jk2K <4025839+jk2K@users.noreply.github.com> Date: Sat, 2 Aug 2025 21:36:08 +0800 Subject: [PATCH] feat: redis support consumer groups (#4912) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- core/stores/redis/redis.go | 111 ++++++++++++++++++++++++++++++++ core/stores/redis/redis_test.go | 101 +++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 66fcef4e0..beca0e93b 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -2402,6 +2402,117 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore) return conn.ZUnionStore(ctx, dest, store).Result() } +func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) { + return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start) +} + +func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + return conn.XGroupCreateMkStream(ctx, stream, group, start).Result() +} + +func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) { + return s.XGroupCreateCtx(context.Background(), stream, group, start) +} + +func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + return conn.XGroupCreate(ctx, stream, group, start).Result() +} + +func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) { + return s.XInfoConsumersCtx(context.Background(), stream, group) +} + +func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + return conn.XInfoConsumers(ctx, stream, group).Result() +} + +func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) { + return s.XInfoGroupsCtx(context.Background(), stream) +} + +func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + return conn.XInfoGroups(ctx, stream).Result() +} + +func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) { + return s.XInfoStreamCtx(context.Background(), stream) +} + +func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + return conn.XInfoStream(ctx, stream).Result() +} + +func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) { + return s.XAddCtx(context.Background(), stream, noMkStream, id, values) +} + +func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + return conn.XAdd(ctx, &red.XAddArgs{ + Stream: stream, + ID: id, + Values: values, + NoMkStream: noMkStream, + }).Result() +} + +func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) { + return s.XAckCtx(context.Background(), stream, group, ids...) +} + +func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) { + conn, err := getRedis(s) + if err != nil { + return 0, err + } + return conn.XAck(ctx, stream, group, ids...).Result() +} + +/** + * streams: list of streams and ids, e.g. stream1 stream2 id1 id2 + */ +func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { + return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...) +} + +func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) { + conn, err := getRedis(s) + if err != nil { + return nil, err + } + return conn.XReadGroup(ctx, &red.XReadGroupArgs{ + Group: group, + Consumer: consumerId, + Count: count, + Block: block, + NoAck: noAck, + Streams: streams, + }).Result() +} + func (s *Redis) checkConnection(pingTimeout time.Duration) error { conn, err := getRedis(s) if err != nil { diff --git a/core/stores/redis/redis_test.go b/core/stores/redis/redis_test.go index b34792ee8..93674fa7a 100644 --- a/core/stores/redis/redis_test.go +++ b/core/stores/redis/redis_test.go @@ -2175,3 +2175,104 @@ func TestRedisTxPipeline(t *testing.T) { assert.Equal(t, hashValue, value) }) } + +func TestRedisXGroupCreate(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).XGroupCreate("Source", "Destination", "0") + assert.NotNil(t, err) + + redisCli := newRedis(client.Addr) + + _, err = redisCli.XGroupCreate("aa", "bb", "0") + assert.NotNil(t, err) + + _, err = newRedis(client.Addr, badType()).XGroupCreateMkStream("Source", "Destination", "0") + assert.NotNil(t, err) + + _, err = redisCli.XGroupCreateMkStream("aa", "bb", "0") + assert.Nil(t, err) + + _, err = redisCli.XGroupCreate("aa", "cc", "0") + assert.Nil(t, err) + }) +} + +func TestRedisXInfo(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).XInfoStream("Source") + assert.NotNil(t, err) + _, err = newRedis(client.Addr, badType()).XInfoGroups("Source") + assert.NotNil(t, err) + _, err = newRedis(client.Addr, badType()).XReadGroup("aa", "consumer", 1, 2000, false, "ss", ">") + assert.NotNil(t, err) + _, err = newRedis(client.Addr, badType()).XInfoConsumers("aa", "bb") + assert.NotNil(t, err) + + redisCli := newRedis(client.Addr) + + stream := "aa" + group := "bb" + + _, err = redisCli.XGroupCreateMkStream(stream, group, "$") + assert.Nil(t, err) + + _, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"}) + assert.Nil(t, err) + + infoStream, err := redisCli.XInfoStream(stream) + assert.Nil(t, err) + assert.Equal(t, int64(1), infoStream.Length) + + infoGroups, err := redisCli.XInfoGroups(stream) + assert.Nil(t, err) + assert.Equal(t, int64(1), infoGroups[0].Lag) + assert.Equal(t, group, infoGroups[0].Name) + + streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">") + assert.Nil(t, err) + assert.Equal(t, 1, len(streamRes)) + assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"]) + + infoConsumers, err := redisCli.XInfoConsumers(stream, group) + assert.Nil(t, err) + assert.Equal(t, 1, len(infoConsumers)) + }) +} + +func TestRedisXReadGroup(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).XAdd("bb", true, "*", []string{"key1", "value1", "key2", "value2"}) + assert.NotNil(t, err) + _, err = newRedis(client.Addr, badType()).XAck("bb", "aa", "123") + assert.NotNil(t, err) + + redisCli := newRedis(client.Addr) + + stream := "aa" + group := "bb" + + _, err = redisCli.XGroupCreateMkStream(stream, group, "$") + assert.Nil(t, err) + + _, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"}) + assert.Nil(t, err) + + streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">") + assert.Nil(t, err) + assert.Equal(t, 1, len(streamRes)) + assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"]) + + streamRes1, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0") + assert.Nil(t, err) + assert.Equal(t, 1, len(streamRes1)) + assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"]) + + _, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID) + assert.Nil(t, err) + + streamRes2, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0") + assert.Nil(t, err) + assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty") + assert.Equal(t, 0, len(streamRes2[0].Messages)) + }) +}