Files
go-zero/core/executors/periodicalexecutor.go

203 lines
4.5 KiB
Go
Raw Normal View History

2020-07-26 17:09:05 +08:00
package executors
import (
"reflect"
"sync"
"sync/atomic"
2020-07-26 17:09:05 +08:00
"time"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/proc"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
2020-07-26 17:09:05 +08:00
)
const idleRound = 10
type (
2021-02-09 13:50:21 +08:00
// TaskContainer interface defines a type that can be used as the underlying
2020-07-26 17:09:05 +08:00
// container that used to do periodical executions.
TaskContainer interface {
// AddTask adds the task into the container.
// Returns true if the container needs to be flushed after the addition.
AddTask(task any) bool
2020-07-26 17:09:05 +08:00
// Execute handles the collected tasks by the container when flushing.
Execute(tasks any)
2020-07-26 17:09:05 +08:00
// RemoveAll removes the contained tasks, and return them.
RemoveAll() any
2020-07-26 17:09:05 +08:00
}
// A PeriodicalExecutor is an executor that periodically execute tasks.
2020-07-26 17:09:05 +08:00
PeriodicalExecutor struct {
commander chan any
2020-07-26 17:09:05 +08:00
interval time.Duration
container TaskContainer
waitGroup sync.WaitGroup
2020-07-29 18:30:32 +08:00
// avoid race condition on waitGroup when calling wg.Add/Done/Wait(...)
wgBarrier syncx.Barrier
confirmChan chan lang.PlaceholderType
inflight int32
guarded bool
newTicker func(duration time.Duration) timex.Ticker
lock sync.Mutex
2020-07-26 17:09:05 +08:00
}
)
// NewPeriodicalExecutor returns a PeriodicalExecutor with given interval and container.
2020-07-26 17:09:05 +08:00
func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
// buffer 1 to let the caller go quickly
commander: make(chan any, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
2020-07-26 17:09:05 +08:00
newTicker: func(d time.Duration) timex.Ticker {
2021-01-14 22:20:09 +08:00
return timex.NewTicker(d)
2020-07-26 17:09:05 +08:00
},
}
proc.AddShutdownListener(func() {
executor.Flush()
})
return executor
}
// Add adds tasks into pe.
func (pe *PeriodicalExecutor) Add(task any) {
2020-07-26 17:09:05 +08:00
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
2020-07-26 17:09:05 +08:00
}
}
// Flush forces pe to execute tasks.
2020-07-26 17:09:05 +08:00
func (pe *PeriodicalExecutor) Flush() bool {
pe.enterExecution()
return pe.executeTasks(func() any {
2020-07-26 17:09:05 +08:00
pe.lock.Lock()
defer pe.lock.Unlock()
return pe.container.RemoveAll()
}())
}
// Sync lets caller run fn thread-safe with pe, especially for the underlying container.
2020-07-26 17:09:05 +08:00
func (pe *PeriodicalExecutor) Sync(fn func()) {
pe.lock.Lock()
defer pe.lock.Unlock()
fn()
}
// Wait waits the execution to be done.
2020-07-26 17:09:05 +08:00
func (pe *PeriodicalExecutor) Wait() {
2020-11-06 12:25:48 +08:00
pe.Flush()
2020-07-29 18:30:32 +08:00
pe.wgBarrier.Guard(func() {
pe.waitGroup.Wait()
})
2020-07-26 17:09:05 +08:00
}
func (pe *PeriodicalExecutor) addAndCheck(task any) (any, bool) {
2020-07-26 17:09:05 +08:00
pe.lock.Lock()
defer func() {
if !pe.guarded {
pe.guarded = true
// defer to unlock quickly
defer pe.backgroundFlush()
2020-07-26 17:09:05 +08:00
}
pe.lock.Unlock()
}()
if pe.container.AddTask(task) {
atomic.AddInt32(&pe.inflight, 1)
return pe.container.RemoveAll(), true
2020-07-26 17:09:05 +08:00
}
return nil, false
}
func (pe *PeriodicalExecutor) backgroundFlush() {
go func() {
// flush before quit goroutine to avoid missing tasks
defer pe.Flush()
2020-07-26 17:09:05 +08:00
ticker := pe.newTicker(pe.interval)
defer ticker.Stop()
var commanded bool
last := timex.Now()
for {
select {
case vals := <-pe.commander:
commanded = true
atomic.AddInt32(&pe.inflight, -1)
pe.enterExecution()
pe.confirmChan <- lang.Placeholder
2020-07-26 17:09:05 +08:00
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
if commanded {
commanded = false
} else if pe.Flush() {
last = timex.Now()
} else if pe.shallQuit(last) {
return
2020-07-26 17:09:05 +08:00
}
}
}
}()
2020-07-26 17:09:05 +08:00
}
func (pe *PeriodicalExecutor) doneExecution() {
pe.waitGroup.Done()
}
func (pe *PeriodicalExecutor) enterExecution() {
2020-07-29 18:30:32 +08:00
pe.wgBarrier.Guard(func() {
pe.waitGroup.Add(1)
})
}
func (pe *PeriodicalExecutor) executeTasks(tasks any) bool {
defer pe.doneExecution()
2020-07-26 17:09:05 +08:00
ok := pe.hasTasks(tasks)
if ok {
threading.RunSafe(func() {
pe.container.Execute(tasks)
})
2020-07-26 17:09:05 +08:00
}
return ok
}
func (pe *PeriodicalExecutor) hasTasks(tasks any) bool {
2020-07-26 17:09:05 +08:00
if tasks == nil {
return false
}
val := reflect.ValueOf(tasks)
switch val.Kind() {
case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice:
return val.Len() > 0
default:
// unknown type, let caller execute it
return true
}
}
func (pe *PeriodicalExecutor) shallQuit(last time.Duration) (stop bool) {
if timex.Since(last) <= pe.interval*idleRound {
return
}
// checking pe.inflight and setting pe.guarded should be locked together
pe.lock.Lock()
if atomic.LoadInt32(&pe.inflight) == 0 {
pe.guarded = false
stop = true
}
pe.lock.Unlock()
return
}