mirror of
https://github.com/zeromicro/go-zero.git
synced 2026-05-13 09:50:00 +08:00
feat: use breaker with ctx to prevent deadline exceeded (#4091)
Signed-off-by: kevin <wanjunfeng@gmail.com>
This commit is contained in:
@@ -141,7 +141,7 @@ func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, aggregate, starTime, err)
|
||||
@@ -161,7 +161,7 @@ func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.Writ
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, bulkWrite, startTime, err)
|
||||
@@ -181,7 +181,7 @@ func (c *decoratedCollection) CountDocuments(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, countDocuments, startTime, err)
|
||||
@@ -201,7 +201,7 @@ func (c *decoratedCollection) DeleteMany(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, deleteMany, startTime, err)
|
||||
@@ -221,7 +221,7 @@ func (c *decoratedCollection) DeleteOne(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, deleteOne, startTime, err, filter)
|
||||
@@ -241,7 +241,7 @@ func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, fi
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, distinct, startTime, err)
|
||||
@@ -261,7 +261,7 @@ func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err)
|
||||
@@ -281,7 +281,7 @@ func (c *decoratedCollection) Find(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, find, startTime, err, filter)
|
||||
@@ -301,7 +301,7 @@ func (c *decoratedCollection) FindOne(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, findOne, startTime, err, filter)
|
||||
@@ -322,7 +322,7 @@ func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, findOneAndDelete, startTime, err, filter)
|
||||
@@ -344,7 +344,7 @@ func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement)
|
||||
@@ -365,7 +365,7 @@ func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter, upda
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update)
|
||||
@@ -386,7 +386,7 @@ func (c *decoratedCollection) InsertMany(ctx context.Context, documents []any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, insertMany, startTime, err)
|
||||
@@ -406,7 +406,7 @@ func (c *decoratedCollection) InsertOne(ctx context.Context, document any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, insertOne, startTime, err, document)
|
||||
@@ -426,7 +426,7 @@ func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter, replacemen
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, replaceOne, startTime, err, filter, replacement)
|
||||
@@ -446,7 +446,7 @@ func (c *decoratedCollection) UpdateByID(ctx context.Context, id, update any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, updateByID, startTime, err, id, update)
|
||||
@@ -466,7 +466,7 @@ func (c *decoratedCollection) UpdateMany(ctx context.Context, filter, update any
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDurationSimple(ctx, updateMany, startTime, err)
|
||||
@@ -486,7 +486,7 @@ func (c *decoratedCollection) UpdateOne(ctx context.Context, filter, update any,
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = c.brk.DoWithAcceptable(func() error {
|
||||
err = c.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
startTime := timex.Now()
|
||||
defer func() {
|
||||
c.logDuration(ctx, updateOne, startTime, err, filter, update)
|
||||
|
||||
@@ -69,27 +69,21 @@ func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Break
|
||||
|
||||
// StartSession starts a new session.
|
||||
func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err error) {
|
||||
err = m.brk.DoWithAcceptable(func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(context.Background(), m.name, startSession, starTime, err)
|
||||
}()
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(context.Background(), m.name, startSession, starTime, err)
|
||||
}()
|
||||
|
||||
session, sessionErr := m.cli.StartSession(opts...)
|
||||
if sessionErr != nil {
|
||||
return sessionErr
|
||||
}
|
||||
session, sessionErr := m.cli.StartSession(opts...)
|
||||
if sessionErr != nil {
|
||||
return nil, sessionErr
|
||||
}
|
||||
|
||||
sess = &wrappedSession{
|
||||
Session: session,
|
||||
name: m.name,
|
||||
brk: m.brk,
|
||||
}
|
||||
|
||||
return nil
|
||||
}, acceptable)
|
||||
|
||||
return
|
||||
return &wrappedSession{
|
||||
Session: session,
|
||||
name: m.name,
|
||||
brk: m.brk,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Aggregate executes an aggregation pipeline.
|
||||
@@ -184,7 +178,7 @@ func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
return w.brk.DoWithAcceptable(func() error {
|
||||
return w.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, abortTransaction, starTime, err)
|
||||
@@ -201,7 +195,7 @@ func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
return w.brk.DoWithAcceptable(func() error {
|
||||
return w.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, commitTransaction, starTime, err)
|
||||
@@ -222,7 +216,7 @@ func (w *wrappedSession) WithTransaction(
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = w.brk.DoWithAcceptable(func() error {
|
||||
err = w.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, withTransaction, starTime, err)
|
||||
@@ -243,7 +237,7 @@ func (w *wrappedSession) EndSession(ctx context.Context) {
|
||||
endSpan(span, err)
|
||||
}()
|
||||
|
||||
err = w.brk.DoWithAcceptable(func() error {
|
||||
err = w.brk.DoWithAcceptableCtx(ctx, func() error {
|
||||
starTime := timex.Now()
|
||||
defer func() {
|
||||
logDuration(ctx, w.name, endSession, starTime, err)
|
||||
|
||||
Reference in New Issue
Block a user