diff --git a/core/lang/lang.go b/core/lang/lang.go index 11ce19f45..ff7305dbe 100644 --- a/core/lang/lang.go +++ b/core/lang/lang.go @@ -4,8 +4,8 @@ package lang var Placeholder PlaceholderType type ( - // GenericType can be used to hold any type. - GenericType = interface{} + // AnyType can be used to hold any type. + AnyType = interface{} // PlaceholderType represents a placeholder type. PlaceholderType = struct{} ) diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index dce1a3bd1..a6b2464ec 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -8,7 +8,6 @@ import ( "github.com/tal-tech/go-zero/core/errorx" "github.com/tal-tech/go-zero/core/lang" - "github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/threading" ) @@ -95,9 +94,9 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} options := buildOptions(opts...) source := buildSource(generate) collector := make(chan interface{}, options.workers) - done := syncx.NewDoneChan() + done := make(chan lang.PlaceholderType) - go executeMappers(options.ctx, mapper, source, collector, done.Done(), options.workers) + go executeMappers(options.ctx, mapper, source, collector, done, options.workers) return collector } @@ -122,13 +121,13 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R }() collector := make(chan interface{}, options.workers) - done := syncx.NewDoneChan() - writer := newGuardedWriter(options.ctx, output, done.Done()) + done := make(chan lang.PlaceholderType) + writer := newGuardedWriter(options.ctx, output, done) var closeOnce sync.Once var retErr errorx.AtomicError finish := func() { closeOnce.Do(func() { - done.Close() + close(done) close(output) }) } @@ -159,7 +158,7 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R go executeMappers(options.ctx, func(item interface{}, w Writer) { mapper(item, w, cancel) - }, source, collector, done.Done(), options.workers) + }, source, collector, done, options.workers) value, ok := <-output if err := retErr.Load(); err != nil {