refactor zrpc server interceptor builder (#4300)

This commit is contained in:
featherlight
2024-08-08 22:37:19 +08:00
committed by GitHub
parent c1f12c5784
commit 03756c9166
8 changed files with 200 additions and 263 deletions

View File

@@ -36,21 +36,23 @@ func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error
var server internal.Server
metrics := stat.NewMetrics(c.ListenOn)
serverOptions := []internal.ServerOption{
internal.WithMetrics(metrics),
internal.WithRpcHealth(c.Health),
}
if c.HasEtcd() {
server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, c.Middlewares, serverOptions...)
server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
if err != nil {
return nil, err
}
} else {
server = internal.NewRpcServer(c.ListenOn, c.Middlewares, serverOptions...)
server = internal.NewRpcServer(c.ListenOn, serverOptions...)
}
server.SetName(c.Name)
if err = setupInterceptors(server, c, metrics); err != nil {
metrics.SetName(c.Name)
setupStreamInterceptors(server, c)
setupUnaryInterceptors(server, c, metrics)
if err = setupAuthInterceptors(server, c); err != nil {
return nil, err
}
@@ -108,6 +110,9 @@ func SetServerSlowThreshold(threshold time.Duration) {
}
func setupAuthInterceptors(svr internal.Server, c RpcServerConf) error {
if !c.Auth {
return nil
}
rds, err := redis.NewRedis(c.Redis.RedisConf)
if err != nil {
return err
@@ -124,22 +129,40 @@ func setupAuthInterceptors(svr internal.Server, c RpcServerConf) error {
return nil
}
func setupInterceptors(svr internal.Server, c RpcServerConf, metrics *stat.Metrics) error {
func setupStreamInterceptors(svr internal.Server, c RpcServerConf) {
if c.Middlewares.Trace {
svr.AddStreamInterceptors(serverinterceptors.StreamTracingInterceptor)
}
if c.Middlewares.Recover {
svr.AddStreamInterceptors(serverinterceptors.StreamRecoverInterceptor)
}
if c.Middlewares.Breaker {
svr.AddStreamInterceptors(serverinterceptors.StreamBreakerInterceptor)
}
}
func setupUnaryInterceptors(svr internal.Server, c RpcServerConf, metrics *stat.Metrics) {
if c.Middlewares.Trace {
svr.AddUnaryInterceptors(serverinterceptors.UnaryTracingInterceptor)
}
if c.Middlewares.Recover {
svr.AddUnaryInterceptors(serverinterceptors.UnaryRecoverInterceptor)
}
if c.Middlewares.Stat {
svr.AddUnaryInterceptors(serverinterceptors.UnaryStatInterceptor(metrics, c.Middlewares.StatConf))
}
if c.Middlewares.Prometheus {
svr.AddUnaryInterceptors(serverinterceptors.UnaryPrometheusInterceptor)
}
if c.Middlewares.Breaker {
svr.AddUnaryInterceptors(serverinterceptors.UnaryBreakerInterceptor)
}
if c.CpuThreshold > 0 {
shedder := load.NewAdaptiveShedder(load.WithCpuThreshold(c.CpuThreshold))
svr.AddUnaryInterceptors(serverinterceptors.UnarySheddingInterceptor(shedder, metrics))
}
if c.Timeout > 0 {
svr.AddUnaryInterceptors(serverinterceptors.UnaryTimeoutInterceptor(
time.Duration(c.Timeout)*time.Millisecond, c.MethodTimeouts...))
}
if c.Auth {
if err := setupAuthInterceptors(svr, c); err != nil {
return err
}
}
return nil
}