From bc43df2641c4dc9f3a6854927378d501e6556dc5 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sun, 14 Sep 2025 19:33:09 +0800 Subject: [PATCH] optimize: mapreduce panic stacktrace (#5168) --- core/mr/mapreduce.go | 13 +++++++--- core/mr/mapreduce_test.go | 51 +++++++++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index 30352441e..503c8b68c 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -3,6 +3,9 @@ package mr import ( "context" "errors" + "fmt" + "runtime/debug" + "strings" "sync" "sync/atomic" @@ -183,12 +186,16 @@ func buildOptions(opts ...Option) *mapReduceOptions { return options } +func buildPanicInfo(r any, stack []byte) string { + return fmt.Sprintf("%+v\n\n%s", r, strings.TrimSpace(string(stack))) +} + func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T { source := make(chan T) go func() { defer func() { if r := recover(); r != nil { - panicChan.write(r) + panicChan.write(buildPanicInfo(r, debug.Stack())) } close(source) }() @@ -235,7 +242,7 @@ func executeMappers[T, U any](mCtx mapperContext[T, U]) { defer func() { if r := recover(); r != nil { atomic.AddInt32(&failed, 1) - mCtx.panicChan.write(r) + mCtx.panicChan.write(buildPanicInfo(r, debug.Stack())) } wg.Done() <-pool @@ -289,7 +296,7 @@ func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, m defer func() { drain(collector) if r := recover(); r != nil { - panicChan.write(r) + panicChan.write(buildPanicInfo(r, debug.Stack())) } finish() }() diff --git a/core/mr/mapreduce_test.go b/core/mr/mapreduce_test.go index c0cc34cec..fd0ac2b46 100644 --- a/core/mr/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -3,6 +3,7 @@ package mr import ( "context" "errors" + "fmt" "io" "log" "runtime" @@ -148,11 +149,28 @@ func TestForEach(t *testing.T) { assert.Equal(t, tasks/2, int(count)) }) +} - t.Run("all", func(t *testing.T) { - defer goleak.VerifyNone(t) +func TestPanics(t *testing.T) { + defer goleak.VerifyNone(t) + + const tasks = 1000 + verify := func(t *testing.T, r any) { + panicStr := fmt.Sprintf("%v", r) + assert.Contains(t, panicStr, "foo") + assert.Contains(t, panicStr, "goroutine") + assert.Contains(t, panicStr, "runtime/debug.Stack") + panic(r) + } + + t.Run("ForEach run panics", func(t *testing.T) { + assert.Panics(t, func() { + defer func() { + if r := recover(); r != nil { + verify(t, r) + } + }() - assert.PanicsWithValue(t, "foo", func() { ForEach(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i @@ -162,28 +180,31 @@ func TestForEach(t *testing.T) { }) }) }) -} -func TestGeneratePanic(t *testing.T) { - defer goleak.VerifyNone(t) + t.Run("ForEach generate panics", func(t *testing.T) { + assert.Panics(t, func() { + defer func() { + if r := recover(); r != nil { + verify(t, r) + } + }() - t.Run("all", func(t *testing.T) { - assert.PanicsWithValue(t, "foo", func() { ForEach(func(source chan<- int) { panic("foo") }, func(item int) { }) }) }) -} -func TestMapperPanic(t *testing.T) { - defer goleak.VerifyNone(t) - - const tasks = 1000 var run int32 - t.Run("all", func(t *testing.T) { - assert.PanicsWithValue(t, "foo", func() { + t.Run("Mapper panics", func(t *testing.T) { + assert.Panics(t, func() { + defer func() { + if r := recover(); r != nil { + verify(t, r) + } + }() + _, _ = MapReduce(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i