chore: refactor mapreduce (#4619)

This commit is contained in:
Kevin Wan
2025-02-01 00:12:37 +08:00
committed by GitHub
parent 4e71e95e44
commit c4e1a6a2d8

View File

@@ -142,89 +142,6 @@ func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reduce
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U],
reducer ReducerFunc[U, V], opts ...Option) (val V, err error) {
options := buildOptions(opts...)
// output is used to write the final result
output := make(chan V)
defer func() {
// reducer can only write once, if more, panic
for range output {
panic("more than one element written in reducer")
}
}()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan U, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan struct{})
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
// use atomic type to avoid data race
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
close(done)
close(output)
})
}
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()
})
go func() {
defer func() {
drain(collector)
if r := recover(); r != nil {
panicChan.write(r)
}
finish()
}()
reducer(collector, writer, cancel)
}()
go executeMappers(mapperContext[T, U]{
ctx: options.ctx,
mapper: func(item T, w Writer[U]) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
select {
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
err = context.DeadlineExceeded
case v := <-panicChan.channel:
// drain output here, otherwise for loop panic in defer
drain(output)
panic(v)
case v, ok := <-output:
if e := retErr.Load(); e != nil {
err = e
} else if ok {
val = v
} else {
err = ErrReduceNoOutput
}
}
return
}
// MapReduceVoid maps all elements generated from given generate,
// and reduce the output elements with given reducer.
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
@@ -330,6 +247,89 @@ func executeMappers[T, U any](mCtx mapperContext[T, U]) {
}
}
// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer.
func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U],
reducer ReducerFunc[U, V], opts ...Option) (val V, err error) {
options := buildOptions(opts...)
// output is used to write the final result
output := make(chan V)
defer func() {
// reducer can only write once, if more, panic
for range output {
panic("more than one element written in reducer")
}
}()
// collector is used to collect data from mapper, and consume in reducer
collector := make(chan U, options.workers)
// if done is closed, all mappers and reducer should stop processing
done := make(chan struct{})
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
// use atomic type to avoid data race
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
close(done)
close(output)
})
}
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()
})
go func() {
defer func() {
drain(collector)
if r := recover(); r != nil {
panicChan.write(r)
}
finish()
}()
reducer(collector, writer, cancel)
}()
go executeMappers(mapperContext[T, U]{
ctx: options.ctx,
mapper: func(item T, w Writer[U]) {
mapper(item, w, cancel)
},
source: source,
panicChan: panicChan,
collector: collector,
doneChan: done,
workers: options.workers,
})
select {
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
err = context.DeadlineExceeded
case v := <-panicChan.channel:
// drain output here, otherwise for loop panic in defer
drain(output)
panic(v)
case v, ok := <-output:
if e := retErr.Load(); e != nil {
err = e
} else if ok {
val = v
} else {
err = ErrReduceNoOutput
}
}
return
}
func newOptions() *mapReduceOptions {
return &mapReduceOptions{
ctx: context.Background(),