diff --git a/core/stores/redis/redis.go b/core/stores/redis/redis.go index 3d20528a9..4f1611651 100644 --- a/core/stores/redis/redis.go +++ b/core/stores/redis/redis.go @@ -1868,6 +1868,21 @@ func (s *Redis) XGroupCreateCtx(ctx context.Context, stream string, group string return conn.XGroupCreate(ctx, stream, group, start).Result() } +// XGroupSetID sets the last delivered ID for a Redis stream consumer group. +func (s *Redis) XGroupSetID(stream, group, start string) (string, error) { + return s.XGroupSetIDCtx(context.Background(), stream, group, start) +} + +// XGroupSetIDCtx is the context-aware version of XGroupSetID. +func (s *Redis) XGroupSetIDCtx(ctx context.Context, stream, group, start string) (string, error) { + conn, err := getRedis(s) + if err != nil { + return "", err + } + + return conn.XGroupSetID(ctx, stream, group, start).Result() +} + // XInfoConsumers returns information about consumers in a Redis stream consumer group. func (s *Redis) XInfoConsumers(stream string, group string) ([]red.XInfoConsumer, error) { return s.XInfoConsumersCtx(context.Background(), stream, group) diff --git a/core/stores/redis/redis_test.go b/core/stores/redis/redis_test.go index 5a37f32d3..ee00df59e 100644 --- a/core/stores/redis/redis_test.go +++ b/core/stores/redis/redis_test.go @@ -2294,6 +2294,31 @@ func TestRedisXGroupCreate(t *testing.T) { }) } +func TestRedisXGroupSetID(t *testing.T) { + runOnRedis(t, func(client *Redis) { + _, err := newRedis(client.Addr, badType()).XGroupSetID("Source", "Destination", "0") + assert.NotNil(t, err) + + redisCli := newRedis(client.Addr) + stream := "aa" + group := "bb" + + _, err = redisCli.XGroupCreateMkStream(stream, group, "0") + assert.Nil(t, err) + + res, err := redisCli.XGroupSetID(stream, group, "0") + assert.Empty(t, res) + assert.ErrorContains(t, err, "not supported") + + _, err = newRedis(client.Addr, badType()).XGroupSetIDCtx(context.Background(), stream, group, "0") + assert.NotNil(t, err) + + res, err = redisCli.XGroupSetIDCtx(context.Background(), stream, group, "0") + assert.Empty(t, res) + assert.ErrorContains(t, err, "not supported") + }) +} + func TestRedisXInfo(t *testing.T) { runOnRedis(t, func(client *Redis) { _, err := newRedis(client.Addr, badType()).XInfoStream("Source")