fix(trace): use sync.Once to prevent multiple trace initialization (#5244)

Signed-off-by: kevin <wanjunfeng@gmail.com>
This commit is contained in:
Kevin Wan
2025-10-25 20:10:15 +08:00
committed by GitHub
parent 1fc2cfb859
commit 4e52d77ad8
2 changed files with 322 additions and 44 deletions

View File

@@ -7,7 +7,6 @@ import (
"os"
"sync"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
@@ -30,42 +29,36 @@ const (
)
var (
agents = make(map[string]lang.PlaceholderType)
lock sync.Mutex
tp *sdktrace.TracerProvider
once sync.Once
tp *sdktrace.TracerProvider
shutdownOnceFn = sync.OnceFunc(func() {
if tp != nil {
_ = tp.Shutdown(context.Background())
}
})
)
// StartAgent starts an opentelemetry agent.
// It uses sync.Once to ensure the agent is initialized only once,
// similar to prometheus.StartAgent and logx.SetUp.
// This prevents multiple ServiceConf.SetUp() calls from reinitializing
// the global tracer provider when running multiple servers (e.g., REST + RPC)
// in the same process.
func StartAgent(c Config) {
if c.Disabled {
return
}
lock.Lock()
defer lock.Unlock()
_, ok := agents[c.Endpoint]
if ok {
return
}
// if error happens, let later calls run.
if err := startAgent(c); err != nil {
return
}
agents[c.Endpoint] = lang.Placeholder
once.Do(func() {
if err := startAgent(c); err != nil {
logx.Error(err)
}
})
}
// StopAgent shuts down the span processors in the order they were registered.
func StopAgent() {
lock.Lock()
defer lock.Unlock()
if tp != nil {
_ = tp.Shutdown(context.Background())
tp = nil
}
shutdownOnceFn()
}
func createExporter(c Config) (sdktrace.SpanExporter, error) {

View File

@@ -1,10 +1,13 @@
package trace
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/logx"
"go.opentelemetry.io/otel"
)
func TestStartAgent(t *testing.T) {
@@ -89,23 +92,305 @@ func TestStartAgent(t *testing.T) {
StartAgent(c10)
defer StopAgent()
lock.Lock()
defer lock.Unlock()
// because remotehost cannot be resolved
assert.Equal(t, 6, len(agents))
_, ok := agents[""]
assert.True(t, ok)
_, ok = agents[endpoint1]
assert.True(t, ok)
_, ok = agents[endpoint2]
assert.False(t, ok)
_, ok = agents[endpoint5]
assert.True(t, ok)
_, ok = agents[endpoint6]
assert.False(t, ok)
_, ok = agents[endpoint71]
assert.True(t, ok)
_, ok = agents[endpoint72]
assert.False(t, ok)
// With sync.Once, only the first non-disabled config (c1) takes effect.
// Subsequent calls are ignored, which is the desired behavior to prevent
// multiple servers (REST + RPC) from reinitializing the global tracer.
assert.NotNil(t, tp)
}
func TestCreateExporter_InvalidFilePath(t *testing.T) {
logx.Disable()
c := Config{
Name: "test-invalid-file",
Endpoint: "/non-existent-directory/trace.log",
Batcher: kindFile,
}
_, err := createExporter(c)
assert.Error(t, err)
assert.Contains(t, err.Error(), "file exporter endpoint error")
}
func TestCreateExporter_UnknownBatcher(t *testing.T) {
logx.Disable()
c := Config{
Name: "test-unknown",
Endpoint: "localhost:1234",
Batcher: "unknown-batcher-type",
}
_, err := createExporter(c)
assert.Error(t, err)
assert.Contains(t, err.Error(), "unknown exporter")
}
func TestCreateExporter_ValidExporters(t *testing.T) {
logx.Disable()
tests := []struct {
name string
config Config
wantErr bool
errMsg string
}{
{
name: "valid file exporter",
config: Config{
Name: "file-test",
Endpoint: "/tmp/trace-test.log",
Batcher: kindFile,
},
wantErr: false,
},
{
name: "invalid file path",
config: Config{
Name: "file-test-invalid",
Endpoint: "/invalid-path/that/does/not/exist/trace.log",
Batcher: kindFile,
},
wantErr: true,
errMsg: "file exporter endpoint error",
},
{
name: "unknown batcher",
config: Config{
Name: "unknown-test",
Endpoint: "localhost:1234",
Batcher: "invalid-batcher",
},
wantErr: true,
errMsg: "unknown exporter",
},
{
name: "jaeger http",
config: Config{
Name: "jaeger-http",
Endpoint: "http://localhost:14268/api/traces",
Batcher: kindJaeger,
},
wantErr: false,
},
{
name: "jaeger udp",
config: Config{
Name: "jaeger-udp",
Endpoint: "udp://localhost:6831",
Batcher: kindJaeger,
},
wantErr: false,
},
{
name: "zipkin",
config: Config{
Name: "zipkin",
Endpoint: "http://localhost:9411/api/v2/spans",
Batcher: kindZipkin,
},
wantErr: false,
},
{
name: "otlpgrpc",
config: Config{
Name: "otlpgrpc",
Endpoint: "localhost:4317",
Batcher: kindOtlpGrpc,
},
wantErr: false,
},
{
name: "otlpgrpc with headers",
config: Config{
Name: "otlpgrpc-headers",
Endpoint: "localhost:4317",
Batcher: kindOtlpGrpc,
OtlpHeaders: map[string]string{
"authorization": "Bearer token123",
"x-custom-key": "custom-value",
},
},
wantErr: false,
},
{
name: "otlphttp",
config: Config{
Name: "otlphttp",
Endpoint: "localhost:4318",
Batcher: kindOtlpHttp,
},
wantErr: false,
},
{
name: "otlphttp with headers",
config: Config{
Name: "otlphttp-headers",
Endpoint: "localhost:4318",
Batcher: kindOtlpHttp,
OtlpHeaders: map[string]string{
"authorization": "Bearer token456",
"x-api-key": "api-key-value",
},
},
wantErr: false,
},
{
name: "otlphttp with headers and path",
config: Config{
Name: "otlphttp-headers-path",
Endpoint: "localhost:4318",
Batcher: kindOtlpHttp,
OtlpHttpPath: "/v1/traces",
OtlpHeaders: map[string]string{
"authorization": "Bearer token789",
"x-custom-trace": "trace-id",
},
},
wantErr: false,
},
{
name: "otlphttp with secure connection",
config: Config{
Name: "otlphttp-secure",
Endpoint: "localhost:4318",
Batcher: kindOtlpHttp,
OtlpHttpSecure: true,
OtlpHeaders: map[string]string{
"authorization": "Bearer secure-token",
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
exporter, err := createExporter(tt.config)
if tt.wantErr {
assert.Error(t, err)
if tt.errMsg != "" {
assert.Contains(t, err.Error(), tt.errMsg)
}
assert.Nil(t, exporter)
} else {
assert.NoError(t, err)
assert.NotNil(t, exporter)
// Clean up the exporter
if exporter != nil {
_ = exporter.Shutdown(context.Background())
}
}
})
}
}
func TestStopAgent(t *testing.T) {
logx.Disable()
// StopAgent should be idempotent and safe to call multiple times
assert.NotPanics(t, func() {
StopAgent()
StopAgent()
StopAgent()
})
}
func TestStartAgent_WithEndpoint(t *testing.T) {
logx.Disable()
tests := []struct {
name string
config Config
wantErr bool
}{
{
name: "empty endpoint - no exporter created",
config: Config{
Name: "test-no-endpoint",
Sampler: 1.0,
},
wantErr: false,
},
{
name: "valid endpoint with file exporter",
config: Config{
Name: "test-with-endpoint",
Endpoint: "/tmp/test-trace.log",
Batcher: kindFile,
Sampler: 1.0,
},
wantErr: false,
},
{
name: "endpoint with invalid exporter type",
config: Config{
Name: "test-invalid-batcher",
Endpoint: "localhost:1234",
Batcher: "invalid-type",
Sampler: 1.0,
},
wantErr: true,
},
{
name: "endpoint with invalid file path",
config: Config{
Name: "test-invalid-path",
Endpoint: "/non/existent/path/trace.log",
Batcher: kindFile,
Sampler: 1.0,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Reset tp for each test
originalTp := tp
tp = nil
defer func() {
if tp != nil {
_ = tp.Shutdown(context.Background())
}
tp = originalTp
}()
err := startAgent(tt.config)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, tp, "TracerProvider should be created")
}
})
}
}
func TestStartAgent_ErrorHandler(t *testing.T) {
// Setup a tracer provider to test error handler
originalTp := tp
tp = nil
defer func() {
if tp != nil {
_ = tp.Shutdown(context.Background())
}
tp = originalTp
}()
// Call startAgent to set up the error handler
config := Config{
Name: "test-error-handler",
Sampler: 1.0,
}
err := startAgent(config)
assert.NoError(t, err)
assert.NotNil(t, tp)
// Verify the error handler was set and can be called without panicking
// We test this by calling otel.Handle which will invoke the registered error handler
testErr := errors.New("test otel error")
assert.NotPanics(t, func() {
otel.Handle(testErr)
}, "Error handler should handle errors without panicking")
}