From c4e1a6a2d87522f6e8a3b87815e033b64c3ceda9 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Sat, 1 Feb 2025 00:12:37 +0800 Subject: [PATCH] chore: refactor mapreduce (#4619) --- core/mr/mapreduce.go | 166 +++++++++++++++++++++---------------------- 1 file changed, 83 insertions(+), 83 deletions(-) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index a56a82fec..30352441e 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -142,89 +142,6 @@ func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reduce return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) } -// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer. -func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U], - reducer ReducerFunc[U, V], opts ...Option) (val V, err error) { - options := buildOptions(opts...) - // output is used to write the final result - output := make(chan V) - defer func() { - // reducer can only write once, if more, panic - for range output { - panic("more than one element written in reducer") - } - }() - - // collector is used to collect data from mapper, and consume in reducer - collector := make(chan U, options.workers) - // if done is closed, all mappers and reducer should stop processing - done := make(chan struct{}) - writer := newGuardedWriter(options.ctx, output, done) - var closeOnce sync.Once - // use atomic type to avoid data race - var retErr errorx.AtomicError - finish := func() { - closeOnce.Do(func() { - close(done) - close(output) - }) - } - cancel := once(func(err error) { - if err != nil { - retErr.Set(err) - } else { - retErr.Set(ErrCancelWithNil) - } - - drain(source) - finish() - }) - - go func() { - defer func() { - drain(collector) - if r := recover(); r != nil { - panicChan.write(r) - } - finish() - }() - - reducer(collector, writer, cancel) - }() - - go executeMappers(mapperContext[T, U]{ - ctx: options.ctx, - mapper: func(item T, w Writer[U]) { - mapper(item, w, cancel) - }, - source: source, - panicChan: panicChan, - collector: collector, - doneChan: done, - workers: options.workers, - }) - - select { - case <-options.ctx.Done(): - cancel(context.DeadlineExceeded) - err = context.DeadlineExceeded - case v := <-panicChan.channel: - // drain output here, otherwise for loop panic in defer - drain(output) - panic(v) - case v, ok := <-output: - if e := retErr.Load(); e != nil { - err = e - } else if ok { - val = v - } else { - err = ErrReduceNoOutput - } - } - - return -} - // MapReduceVoid maps all elements generated from given generate, // and reduce the output elements with given reducer. func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U], @@ -330,6 +247,89 @@ func executeMappers[T, U any](mCtx mapperContext[T, U]) { } } +// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer. +func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U], + reducer ReducerFunc[U, V], opts ...Option) (val V, err error) { + options := buildOptions(opts...) + // output is used to write the final result + output := make(chan V) + defer func() { + // reducer can only write once, if more, panic + for range output { + panic("more than one element written in reducer") + } + }() + + // collector is used to collect data from mapper, and consume in reducer + collector := make(chan U, options.workers) + // if done is closed, all mappers and reducer should stop processing + done := make(chan struct{}) + writer := newGuardedWriter(options.ctx, output, done) + var closeOnce sync.Once + // use atomic type to avoid data race + var retErr errorx.AtomicError + finish := func() { + closeOnce.Do(func() { + close(done) + close(output) + }) + } + cancel := once(func(err error) { + if err != nil { + retErr.Set(err) + } else { + retErr.Set(ErrCancelWithNil) + } + + drain(source) + finish() + }) + + go func() { + defer func() { + drain(collector) + if r := recover(); r != nil { + panicChan.write(r) + } + finish() + }() + + reducer(collector, writer, cancel) + }() + + go executeMappers(mapperContext[T, U]{ + ctx: options.ctx, + mapper: func(item T, w Writer[U]) { + mapper(item, w, cancel) + }, + source: source, + panicChan: panicChan, + collector: collector, + doneChan: done, + workers: options.workers, + }) + + select { + case <-options.ctx.Done(): + cancel(context.DeadlineExceeded) + err = context.DeadlineExceeded + case v := <-panicChan.channel: + // drain output here, otherwise for loop panic in defer + drain(output) + panic(v) + case v, ok := <-output: + if e := retErr.Load(); e != nil { + err = e + } else if ok { + val = v + } else { + err = ErrReduceNoOutput + } + } + + return +} + func newOptions() *mapReduceOptions { return &mapReduceOptions{ ctx: context.Background(),