diff --git a/core/syncx/immutableresource.go b/core/syncx/immutableresource.go index dd9e4733a..8b79a6ce5 100644 --- a/core/syncx/immutableresource.go +++ b/core/syncx/immutableresource.go @@ -47,30 +47,32 @@ func (ir *ImmutableResource) Get() (any, error) { return resource, nil } - ir.maybeRefresh(func() { - res, err := ir.fetch() - ir.lock.Lock() - if err != nil { - ir.err = err - } else { - ir.resource, ir.err = res, nil - } - ir.lock.Unlock() - }) + ir.lock.Lock() + defer ir.lock.Unlock() - ir.lock.RLock() - resource, err := ir.resource, ir.err - ir.lock.RUnlock() - return resource, err + // double check + if ir.resource != nil { + return ir.resource, nil + } + if ir.err != nil && !ir.shouldRefresh() { + return ir.resource, ir.err + } + + res, err := ir.fetch() + ir.lastTime.Set(timex.Now()) + if err != nil { + ir.err = err + return nil, err + } + + ir.resource, ir.err = res, nil + return res, nil } -func (ir *ImmutableResource) maybeRefresh(execute func()) { +func (ir *ImmutableResource) shouldRefresh() bool { now := timex.Now() lastTime := ir.lastTime.Load() - if lastTime == 0 || lastTime+ir.refreshInterval < now { - ir.lastTime.Set(now) - execute() - } + return lastTime == 0 || lastTime+ir.refreshInterval < now } // WithRefreshIntervalOnFailure sets refresh interval on failure. diff --git a/core/syncx/immutableresource_test.go b/core/syncx/immutableresource_test.go index 8aec6b934..3405f4d3c 100644 --- a/core/syncx/immutableresource_test.go +++ b/core/syncx/immutableresource_test.go @@ -2,6 +2,8 @@ package syncx import ( "errors" + "sync" + "sync/atomic" "testing" "time" @@ -56,6 +58,46 @@ func TestImmutableResourceError(t *testing.T) { assert.Equal(t, 2, count) } +func TestImmutableResourceConcurrent(t *testing.T) { + var count int32 + ready := make(chan struct{}) + r := NewImmutableResource(func() (any, error) { + atomic.AddInt32(&count, 1) + close(ready) // signal that fetch started + time.Sleep(10 * time.Millisecond) // simulate slow fetch + return "hello", nil + }) + + const goroutines = 100 + var wg sync.WaitGroup + results := make([]any, goroutines) + errors := make([]error, goroutines) + + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func(idx int) { + defer wg.Done() + res, err := r.Get() + results[idx] = res + errors[idx] = err + }(i) + } + + // wait for fetch to start + <-ready + + wg.Wait() + + // fetch should only be called once despite concurrent access + assert.Equal(t, int32(1), atomic.LoadInt32(&count)) + + // all goroutines should eventually get the same result + for i := 0; i < goroutines; i++ { + assert.Nil(t, errors[i]) + assert.Equal(t, "hello", results[i]) + } +} + func TestImmutableResourceErrorRefreshAlways(t *testing.T) { var count int r := NewImmutableResource(func() (any, error) {