optimize: mapreduce panic stacktrace (#5168)

This commit is contained in:
Kevin Wan
2025-09-14 19:33:09 +08:00
committed by GitHub
parent 351b8cb37b
commit bc43df2641
2 changed files with 46 additions and 18 deletions

View File

@@ -3,6 +3,9 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"runtime/debug"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -183,12 +186,16 @@ func buildOptions(opts ...Option) *mapReduceOptions {
return options return options
} }
func buildPanicInfo(r any, stack []byte) string {
return fmt.Sprintf("%+v\n\n%s", r, strings.TrimSpace(string(stack)))
}
func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T { func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T {
source := make(chan T) source := make(chan T)
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
panicChan.write(r) panicChan.write(buildPanicInfo(r, debug.Stack()))
} }
close(source) close(source)
}() }()
@@ -235,7 +242,7 @@ func executeMappers[T, U any](mCtx mapperContext[T, U]) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
atomic.AddInt32(&failed, 1) atomic.AddInt32(&failed, 1)
mCtx.panicChan.write(r) mCtx.panicChan.write(buildPanicInfo(r, debug.Stack()))
} }
wg.Done() wg.Done()
<-pool <-pool
@@ -289,7 +296,7 @@ func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, m
defer func() { defer func() {
drain(collector) drain(collector)
if r := recover(); r != nil { if r := recover(); r != nil {
panicChan.write(r) panicChan.write(buildPanicInfo(r, debug.Stack()))
} }
finish() finish()
}() }()

View File

@@ -3,6 +3,7 @@ package mr
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"log" "log"
"runtime" "runtime"
@@ -148,11 +149,28 @@ func TestForEach(t *testing.T) {
assert.Equal(t, tasks/2, int(count)) assert.Equal(t, tasks/2, int(count))
}) })
}
t.Run("all", func(t *testing.T) { func TestPanics(t *testing.T) {
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
const tasks = 1000
verify := func(t *testing.T, r any) {
panicStr := fmt.Sprintf("%v", r)
assert.Contains(t, panicStr, "foo")
assert.Contains(t, panicStr, "goroutine")
assert.Contains(t, panicStr, "runtime/debug.Stack")
panic(r)
}
t.Run("ForEach run panics", func(t *testing.T) {
assert.Panics(t, func() {
defer func() {
if r := recover(); r != nil {
verify(t, r)
}
}()
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- int) { ForEach(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i
@@ -162,28 +180,31 @@ func TestForEach(t *testing.T) {
}) })
}) })
}) })
}
func TestGeneratePanic(t *testing.T) { t.Run("ForEach generate panics", func(t *testing.T) {
defer goleak.VerifyNone(t) assert.Panics(t, func() {
defer func() {
if r := recover(); r != nil {
verify(t, r)
}
}()
t.Run("all", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() {
ForEach(func(source chan<- int) { ForEach(func(source chan<- int) {
panic("foo") panic("foo")
}, func(item int) { }, func(item int) {
}) })
}) })
}) })
}
func TestMapperPanic(t *testing.T) {
defer goleak.VerifyNone(t)
const tasks = 1000
var run int32 var run int32
t.Run("all", func(t *testing.T) { t.Run("Mapper panics", func(t *testing.T) {
assert.PanicsWithValue(t, "foo", func() { assert.Panics(t, func() {
defer func() {
if r := recover(); r != nil {
verify(t, r)
}
}()
_, _ = MapReduce(func(source chan<- int) { _, _ = MapReduce(func(source chan<- int) {
for i := 0; i < tasks; i++ { for i := 0; i < tasks; i++ {
source <- i source <- i