feat: redis support consumer groups (#4912)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
jk2K
2025-08-02 21:36:08 +08:00
committed by GitHub
parent b69db5e09d
commit 9da76fbf04
2 changed files with 212 additions and 0 deletions

View File

@@ -2402,6 +2402,117 @@ func (s *Redis) ZunionstoreCtx(ctx context.Context, dest string, store *ZStore)
return conn.ZUnionStore(ctx, dest, store).Result()
}
func (s *Redis) XGroupCreateMkStream(stream string, group string, start string) (string, error) {
return s.XGroupCreateMkStreamCtx(context.Background(), stream, group, start)
}
func (s *Redis) XGroupCreateMkStreamCtx(ctx context.Context, stream string, group string, start string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreateMkStream(ctx, stream, group, start).Result()
}
func (s *Redis) XGroupCreate(stream string, group string, start string) (string, error) {
return s.XGroupCreateCtx(context.Background(), stream, group, start)
}
func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string, start string) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XGroupCreate(ctx, stream, group, start).Result()
}
func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) {
return s.XInfoConsumersCtx(context.Background(), stream, group)
}
func (s *Redis) XInfoConsumersCtx(ctx context.Context, stream string, group string) ([]red.XInfoConsumer, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoConsumers(ctx, stream, group).Result()
}
func (s *Redis) XInfoGroups(stream string) ([]red.XInfoGroup, error) {
return s.XInfoGroupsCtx(context.Background(), stream)
}
func (s *Redis) XInfoGroupsCtx(ctx context.Context, stream string) ([]red.XInfoGroup, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoGroups(ctx, stream).Result()
}
func (s *Redis) XInfoStream(stream string) (*red.XInfoStream, error) {
return s.XInfoStreamCtx(context.Background(), stream)
}
func (s *Redis) XInfoStreamCtx(ctx context.Context, stream string) (*red.XInfoStream, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XInfoStream(ctx, stream).Result()
}
func (s *Redis) XAdd(stream string, noMkStream bool, id string, values interface{}) (string, error) {
return s.XAddCtx(context.Background(), stream, noMkStream, id, values)
}
func (s *Redis) XAddCtx(ctx context.Context, stream string, noMkStream bool, id string, values interface{}) (string, error) {
conn, err := getRedis(s)
if err != nil {
return "", err
}
return conn.XAdd(ctx, &red.XAddArgs{
Stream: stream,
ID: id,
Values: values,
NoMkStream: noMkStream,
}).Result()
}
func (s *Redis) XAck(stream string, group string, ids ...string) (int64, error) {
return s.XAckCtx(context.Background(), stream, group, ids...)
}
func (s *Redis) XAckCtx(ctx context.Context, stream string, group string, ids ...string) (int64, error) {
conn, err := getRedis(s)
if err != nil {
return 0, err
}
return conn.XAck(ctx, stream, group, ids...).Result()
}
/**
* streams: list of streams and ids, e.g. stream1 stream2 id1 id2
*/
func (s *Redis) XReadGroup(group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
return s.XReadGroupCtx(context.Background(), group, consumerId, count, block, noAck, streams...)
}
func (s *Redis) XReadGroupCtx(ctx context.Context, group string, consumerId string, count int64, block time.Duration, noAck bool, streams ...string) ([]red.XStream, error) {
conn, err := getRedis(s)
if err != nil {
return nil, err
}
return conn.XReadGroup(ctx, &red.XReadGroupArgs{
Group: group,
Consumer: consumerId,
Count: count,
Block: block,
NoAck: noAck,
Streams: streams,
}).Result()
}
func (s *Redis) checkConnection(pingTimeout time.Duration) error {
conn, err := getRedis(s)
if err != nil {

View File

@@ -2175,3 +2175,104 @@ func TestRedisTxPipeline(t *testing.T) {
assert.Equal(t, hashValue, value)
})
}
func TestRedisXGroupCreate(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).XGroupCreate("Source", "Destination", "0")
assert.NotNil(t, err)
redisCli := newRedis(client.Addr)
_, err = redisCli.XGroupCreate("aa", "bb", "0")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XGroupCreateMkStream("Source", "Destination", "0")
assert.NotNil(t, err)
_, err = redisCli.XGroupCreateMkStream("aa", "bb", "0")
assert.Nil(t, err)
_, err = redisCli.XGroupCreate("aa", "cc", "0")
assert.Nil(t, err)
})
}
func TestRedisXInfo(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).XInfoStream("Source")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XInfoGroups("Source")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XReadGroup("aa", "consumer", 1, 2000, false, "ss", ">")
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XInfoConsumers("aa", "bb")
assert.NotNil(t, err)
redisCli := newRedis(client.Addr)
stream := "aa"
group := "bb"
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
assert.Nil(t, err)
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
assert.Nil(t, err)
infoStream, err := redisCli.XInfoStream(stream)
assert.Nil(t, err)
assert.Equal(t, int64(1), infoStream.Length)
infoGroups, err := redisCli.XInfoGroups(stream)
assert.Nil(t, err)
assert.Equal(t, int64(1), infoGroups[0].Lag)
assert.Equal(t, group, infoGroups[0].Name)
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes))
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
infoConsumers, err := redisCli.XInfoConsumers(stream, group)
assert.Nil(t, err)
assert.Equal(t, 1, len(infoConsumers))
})
}
func TestRedisXReadGroup(t *testing.T) {
runOnRedis(t, func(client *Redis) {
_, err := newRedis(client.Addr, badType()).XAdd("bb", true, "*", []string{"key1", "value1", "key2", "value2"})
assert.NotNil(t, err)
_, err = newRedis(client.Addr, badType()).XAck("bb", "aa", "123")
assert.NotNil(t, err)
redisCli := newRedis(client.Addr)
stream := "aa"
group := "bb"
_, err = redisCli.XGroupCreateMkStream(stream, group, "$")
assert.Nil(t, err)
_, err = redisCli.XAdd(stream, true, "*", []string{"key1", "value1", "key2", "value2"})
assert.Nil(t, err)
streamRes, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, ">")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes))
assert.Equal(t, "value1", streamRes[0].Messages[0].Values["key1"])
streamRes1, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
assert.Nil(t, err)
assert.Equal(t, 1, len(streamRes1))
assert.Equal(t, "value1", streamRes1[0].Messages[0].Values["key1"])
_, err = redisCli.XAck(stream, group, streamRes[0].Messages[0].ID)
assert.Nil(t, err)
streamRes2, err := redisCli.XReadGroup(group, "consumer", 1, 2000, false, stream, "0")
assert.Nil(t, err)
assert.Greater(t, len(streamRes2), 0, "streamRes2 is empty")
assert.Equal(t, 0, len(streamRes2[0].Messages))
})
}