update:github.com/mongodb/mongo-go-driver v2.0 Migration (#4687)

This commit is contained in:
me-cs
2025-08-09 21:21:53 +08:00
committed by GitHub
parent f36e5fed35
commit b41b1b00df
22 changed files with 2515 additions and 1174 deletions

View File

@@ -1,3 +1,4 @@
//go:generate mockgen -package mon -destination model_mock.go -source model.go monClient monSession
package mon
import (
@@ -7,8 +8,8 @@ import (
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/timex"
"go.mongodb.org/mongo-driver/mongo"
mopt "go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
const (
@@ -24,15 +25,15 @@ type (
Model struct {
Collection
name string
cli *mongo.Client
cli monClient
brk breaker.Breaker
opts []Option
}
wrappedSession struct {
mongo.Session
name string
brk breaker.Breaker
WrappedSession struct {
session monSession
name string
brk breaker.Breaker
}
)
@@ -61,6 +62,17 @@ func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Break
return &Model{
name: name,
Collection: coll,
cli: &mockMonClient{c: cli},
brk: brk,
opts: opts,
}
}
func newTestModel(name string, cli monClient, coll monCollection, brk breaker.Breaker,
opts ...Option) *Model {
return &Model{
name: name,
Collection: newTestCollection(coll, breaker.GetBreaker("localhost")),
cli: cli,
brk: brk,
opts: opts,
@@ -68,7 +80,7 @@ func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Break
}
// StartSession starts a new session.
func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err error) {
func (m *Model) StartSession(opts ...options.Lister[options.SessionOptions]) (sess *WrappedSession, err error) {
starTime := timex.Now()
defer func() {
logDuration(context.Background(), m.name, startSession, starTime, err)
@@ -79,15 +91,15 @@ func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session,
return nil, sessionErr
}
return &wrappedSession{
Session: session,
return &WrappedSession{
session: session,
name: m.name,
brk: m.brk,
}, nil
}
// Aggregate executes an aggregation pipeline.
func (m *Model) Aggregate(ctx context.Context, v, pipeline any, opts ...*mopt.AggregateOptions) error {
func (m *Model) Aggregate(ctx context.Context, v, pipeline any, opts ...options.Lister[options.AggregateOptions]) error {
cur, err := m.Collection.Aggregate(ctx, pipeline, opts...)
if err != nil {
return err
@@ -98,7 +110,7 @@ func (m *Model) Aggregate(ctx context.Context, v, pipeline any, opts ...*mopt.Ag
}
// DeleteMany deletes documents that match the filter.
func (m *Model) DeleteMany(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (int64, error) {
func (m *Model) DeleteMany(ctx context.Context, filter any, opts ...options.Lister[options.DeleteManyOptions]) (int64, error) {
res, err := m.Collection.DeleteMany(ctx, filter, opts...)
if err != nil {
return 0, err
@@ -108,7 +120,7 @@ func (m *Model) DeleteMany(ctx context.Context, filter any, opts ...*mopt.Delete
}
// DeleteOne deletes the first document that matches the filter.
func (m *Model) DeleteOne(ctx context.Context, filter any, opts ...*mopt.DeleteOptions) (int64, error) {
func (m *Model) DeleteOne(ctx context.Context, filter any, opts ...options.Lister[options.DeleteOneOptions]) (int64, error) {
res, err := m.Collection.DeleteOne(ctx, filter, opts...)
if err != nil {
return 0, err
@@ -118,7 +130,7 @@ func (m *Model) DeleteOne(ctx context.Context, filter any, opts ...*mopt.DeleteO
}
// Find finds documents that match the filter.
func (m *Model) Find(ctx context.Context, v, filter any, opts ...*mopt.FindOptions) error {
func (m *Model) Find(ctx context.Context, v, filter any, opts ...options.Lister[options.FindOptions]) error {
cur, err := m.Collection.Find(ctx, filter, opts...)
if err != nil {
return err
@@ -129,7 +141,7 @@ func (m *Model) Find(ctx context.Context, v, filter any, opts ...*mopt.FindOptio
}
// FindOne finds the first document that matches the filter.
func (m *Model) FindOne(ctx context.Context, v, filter any, opts ...*mopt.FindOneOptions) error {
func (m *Model) FindOne(ctx context.Context, v, filter any, opts ...options.Lister[options.FindOneOptions]) error {
res, err := m.Collection.FindOne(ctx, filter, opts...)
if err != nil {
return err
@@ -140,7 +152,7 @@ func (m *Model) FindOne(ctx context.Context, v, filter any, opts ...*mopt.FindOn
// FindOneAndDelete finds a single document and deletes it.
func (m *Model) FindOneAndDelete(ctx context.Context, v, filter any,
opts ...*mopt.FindOneAndDeleteOptions) error {
opts ...options.Lister[options.FindOneAndDeleteOptions]) error {
res, err := m.Collection.FindOneAndDelete(ctx, filter, opts...)
if err != nil {
return err
@@ -151,7 +163,7 @@ func (m *Model) FindOneAndDelete(ctx context.Context, v, filter any,
// FindOneAndReplace finds a single document and replaces it.
func (m *Model) FindOneAndReplace(ctx context.Context, v, filter, replacement any,
opts ...*mopt.FindOneAndReplaceOptions) error {
opts ...options.Lister[options.FindOneAndReplaceOptions]) error {
res, err := m.Collection.FindOneAndReplace(ctx, filter, replacement, opts...)
if err != nil {
return err
@@ -162,7 +174,7 @@ func (m *Model) FindOneAndReplace(ctx context.Context, v, filter, replacement an
// FindOneAndUpdate finds a single document and updates it.
func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter, update any,
opts ...*mopt.FindOneAndUpdateOptions) error {
opts ...options.Lister[options.FindOneAndUpdateOptions]) error {
res, err := m.Collection.FindOneAndUpdate(ctx, filter, update, opts...)
if err != nil {
return err
@@ -171,8 +183,8 @@ func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter, update any,
return res.Decode(v)
}
// AbortTransaction implements the mongo.Session interface.
func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) {
// AbortTransaction implements the mongo.session interface.
func (w *WrappedSession) AbortTransaction(ctx context.Context) (err error) {
ctx, span := startSpan(ctx, abortTransaction)
defer func() {
endSpan(span, err)
@@ -184,12 +196,12 @@ func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) {
logDuration(ctx, w.name, abortTransaction, starTime, err)
}()
return w.Session.AbortTransaction(ctx)
return w.session.AbortTransaction(ctx)
}, acceptable)
}
// CommitTransaction implements the mongo.Session interface.
func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) {
// CommitTransaction implements the mongo.session interface.
func (w *WrappedSession) CommitTransaction(ctx context.Context) (err error) {
ctx, span := startSpan(ctx, commitTransaction)
defer func() {
endSpan(span, err)
@@ -201,15 +213,15 @@ func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) {
logDuration(ctx, w.name, commitTransaction, starTime, err)
}()
return w.Session.CommitTransaction(ctx)
return w.session.CommitTransaction(ctx)
}, acceptable)
}
// WithTransaction implements the mongo.Session interface.
func (w *wrappedSession) WithTransaction(
// WithTransaction implements the mongo.session interface.
func (w *WrappedSession) WithTransaction(
ctx context.Context,
fn func(sessCtx mongo.SessionContext) (any, error),
opts ...*mopt.TransactionOptions,
fn func(sessCtx context.Context) (any, error),
opts ...options.Lister[options.TransactionOptions],
) (res any, err error) {
ctx, span := startSpan(ctx, withTransaction)
defer func() {
@@ -222,15 +234,15 @@ func (w *wrappedSession) WithTransaction(
logDuration(ctx, w.name, withTransaction, starTime, err)
}()
res, err = w.Session.WithTransaction(ctx, fn, opts...)
res, err = w.session.WithTransaction(ctx, fn, opts...)
return err
}, acceptable)
return
}
// EndSession implements the mongo.Session interface.
func (w *wrappedSession) EndSession(ctx context.Context) {
// EndSession implements the mongo.session interface.
func (w *WrappedSession) EndSession(ctx context.Context) {
var err error
ctx, span := startSpan(ctx, endSession)
defer func() {
@@ -243,7 +255,29 @@ func (w *wrappedSession) EndSession(ctx context.Context) {
logDuration(ctx, w.name, endSession, starTime, err)
}()
w.Session.EndSession(ctx)
w.session.EndSession(ctx)
return nil
}, acceptable)
}
type (
//for unit test
monClient interface {
StartSession(opts ...options.Lister[options.SessionOptions]) (monSession, error)
}
monSession interface {
AbortTransaction(ctx context.Context) error
CommitTransaction(ctx context.Context) error
EndSession(ctx context.Context)
WithTransaction(ctx context.Context, fn func(sessCtx context.Context) (any, error),
opts ...options.Lister[options.TransactionOptions]) (any, error)
}
)
type mockMonClient struct {
c *mongo.Client
}
func (m *mockMonClient) StartSession(opts ...options.Lister[options.SessionOptions]) (monSession, error) {
return m.c.StartSession(opts...)
}