diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 000000000..d7cf44deb --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,3 @@ +ignore: + - "example/*" + - "tools/*" \ No newline at end of file diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 0e7c1003d..bab9ec7cd 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -25,10 +25,11 @@ jobs: - name: Get dependencies run: | go get -v -t -d ./... - if [ -f Gopkg.toml ]; then - curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh - dep ensure - fi - name: Test - run: go test -v -race ./... + run: go test -race -coverprofile=coverage.txt -covermode=atomic ./... + + - name: Codecov + uses: codecov/codecov-action@v1.0.6 + with: + token: ${{secrets.CODECOV_TOKEN}} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index ae16716ab..000000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,18 +0,0 @@ -stages: -- analysis - -variables: - GOPATH: '/runner-cache/zero' - GOCACHE: '/runner-cache/zero' - GOPROXY: 'https://goproxy.cn,direct' - -analysis: - stage: analysis - image: golang - script: - - go version && go env - - go test -short $(go list ./...) | grep -v "no test" - only: - - merge_requests - tags: - - common diff --git a/core/discov/facade.go b/core/discov/facade.go index 2b7497a66..2a7838f2b 100644 --- a/core/discov/facade.go +++ b/core/discov/facade.go @@ -2,7 +2,7 @@ package discov import ( "github.com/tal-tech/go-zero/core/discov/internal" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" ) type ( @@ -26,7 +26,7 @@ func NewFacade(endpoints []string) Facade { func (f Facade) Client() internal.EtcdClient { conn, err := f.registry.GetConn(f.endpoints) - lang.Must(err) + logx.Must(err) return conn } diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index a6d8c3824..98bb7cb50 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/proc" "github.com/tal-tech/go-zero/core/syncx" "github.com/tal-tech/go-zero/core/threading" @@ -32,19 +33,21 @@ type ( container TaskContainer waitGroup sync.WaitGroup // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...) - wgBarrier syncx.Barrier - guarded bool - newTicker func(duration time.Duration) timex.Ticker - lock sync.Mutex + wgBarrier syncx.Barrier + confirmChan chan lang.PlaceholderType + guarded bool + newTicker func(duration time.Duration) timex.Ticker + lock sync.Mutex } ) func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *PeriodicalExecutor { executor := &PeriodicalExecutor{ // buffer 1 to let the caller go quickly - commander: make(chan interface{}, 1), - interval: interval, - container: container, + commander: make(chan interface{}, 1), + interval: interval, + container: container, + confirmChan: make(chan lang.PlaceholderType), newTicker: func(d time.Duration) timex.Ticker { return timex.NewTicker(interval) }, @@ -59,10 +62,12 @@ func NewPeriodicalExecutor(interval time.Duration, container TaskContainer) *Per func (pe *PeriodicalExecutor) Add(task interface{}) { if vals, ok := pe.addAndCheck(task); ok { pe.commander <- vals + <-pe.confirmChan } } func (pe *PeriodicalExecutor) Flush() bool { + pe.enterExecution() return pe.executeTasks(func() interface{} { pe.lock.Lock() defer pe.lock.Unlock() @@ -114,6 +119,8 @@ func (pe *PeriodicalExecutor) backgroundFlush() { select { case vals := <-pe.commander: commanded = true + pe.enterExecution() + pe.confirmChan <- lang.Placeholder pe.executeTasks(vals) last = timex.Now() case <-ticker.Chan(): @@ -135,13 +142,18 @@ func (pe *PeriodicalExecutor) backgroundFlush() { }) } -func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { +func (pe *PeriodicalExecutor) doneExecution() { + pe.waitGroup.Done() +} + +func (pe *PeriodicalExecutor) enterExecution() { pe.wgBarrier.Guard(func() { pe.waitGroup.Add(1) }) - defer pe.wgBarrier.Guard(func() { - pe.waitGroup.Done() - }) +} + +func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { + defer pe.doneExecution() ok := pe.hasTasks(tasks) if ok { diff --git a/core/executors/periodicalexecutor_test.go b/core/executors/periodicalexecutor_test.go index 34f5594e5..ed25188fc 100644 --- a/core/executors/periodicalexecutor_test.go +++ b/core/executors/periodicalexecutor_test.go @@ -106,6 +106,40 @@ func TestPeriodicalExecutor_Bulk(t *testing.T) { lock.Unlock() } +func TestPeriodicalExecutor_Wait(t *testing.T) { + var lock sync.Mutex + executer := NewBulkExecutor(func(tasks []interface{}) { + lock.Lock() + defer lock.Unlock() + time.Sleep(10 * time.Millisecond) + }, WithBulkTasks(1), WithBulkInterval(time.Second)) + for i := 0; i < 10; i++ { + executer.Add(1) + } + executer.Flush() + executer.Wait() +} + +func TestPeriodicalExecutor_WaitFast(t *testing.T) { + const total = 3 + var cnt int + var lock sync.Mutex + executer := NewBulkExecutor(func(tasks []interface{}) { + defer func() { + cnt++ + }() + lock.Lock() + defer lock.Unlock() + time.Sleep(10 * time.Millisecond) + }, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond)) + for i := 0; i < total; i++ { + executer.Add(2) + } + executer.Flush() + executer.Wait() + assert.Equal(t, total, cnt) +} + // go test -benchtime 10s -bench . func BenchmarkExecutor(b *testing.B) { b.ReportAllocs() diff --git a/core/lang/lang.go b/core/lang/lang.go index cf02ad6e9..901c5604e 100644 --- a/core/lang/lang.go +++ b/core/lang/lang.go @@ -1,16 +1,8 @@ package lang -import "log" - var Placeholder PlaceholderType type ( GenericType = interface{} PlaceholderType = struct{} ) - -func Must(err error) { - if err != nil { - log.Fatal(err) - } -} diff --git a/core/lang/lang_test.go b/core/lang/lang_test.go deleted file mode 100644 index 6b3c6417f..000000000 --- a/core/lang/lang_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package lang - -import "testing" - -func TestMust(t *testing.T) { - Must(nil) -} diff --git a/core/logx/logs.go b/core/logx/logs.go index edb710401..b3ac081cc 100644 --- a/core/logx/logs.go +++ b/core/logx/logs.go @@ -17,7 +17,6 @@ import ( "sync/atomic" "github.com/tal-tech/go-zero/core/iox" - "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/sysx" "github.com/tal-tech/go-zero/core/timex" ) @@ -46,6 +45,7 @@ const ( levelInfo = "info" levelError = "error" levelSevere = "severe" + levelFatal = "fatal" levelSlow = "slow" levelStat = "stat" @@ -100,7 +100,7 @@ type ( ) func MustSetup(c LogConf) { - lang.Must(SetUp(c)) + Must(SetUp(c)) } // SetUp sets up the logx. If already set up, just return nil. @@ -210,6 +210,14 @@ func Infof(format string, v ...interface{}) { infoSync(fmt.Sprintf(format, v...)) } +func Must(err error) { + if err != nil { + msg := formatWithCaller(err.Error(), 3) + output(severeLog, levelFatal, msg) + os.Exit(1) + } +} + func SetLevel(level uint32) { atomic.StoreUint32(&logLevel, level) } diff --git a/core/logx/logs_test.go b/core/logx/logs_test.go index cb3e6d8a9..ede9d5054 100644 --- a/core/logx/logs_test.go +++ b/core/logx/logs_test.go @@ -37,7 +37,7 @@ func (mw *mockWriter) Reset() { } func (mw *mockWriter) Contains(text string) bool { - return strings.Index(mw.builder.String(), text) > -1 + return strings.Contains(mw.builder.String(), text) } func TestFileLineFileMode(t *testing.T) { @@ -131,6 +131,10 @@ func TestSetLevelWithDuration(t *testing.T) { assert.Equal(t, 0, writer.builder.Len()) } +func TestMustNil(t *testing.T) { + Must(nil) +} + func BenchmarkCopyByteSliceAppend(b *testing.B) { for i := 0; i < b.N; i++ { var buf []byte diff --git a/core/queue/balancedqueuepusher.go b/core/queue/balancedqueuepusher.go new file mode 100644 index 000000000..1e40cf30e --- /dev/null +++ b/core/queue/balancedqueuepusher.go @@ -0,0 +1,44 @@ +package queue + +import ( + "errors" + "sync/atomic" + + "github.com/tal-tech/go-zero/core/logx" +) + +var ErrNoAvailablePusher = errors.New("no available pusher") + +type BalancedQueuePusher struct { + name string + pushers []Pusher + index uint64 +} + +func NewBalancedQueuePusher(pushers []Pusher) Pusher { + return &BalancedQueuePusher{ + name: generateName(pushers), + pushers: pushers, + } +} + +func (pusher *BalancedQueuePusher) Name() string { + return pusher.name +} + +func (pusher *BalancedQueuePusher) Push(message string) error { + size := len(pusher.pushers) + + for i := 0; i < size; i++ { + index := atomic.AddUint64(&pusher.index, 1) % uint64(size) + target := pusher.pushers[index] + + if err := target.Push(message); err != nil { + logx.Error(err) + } else { + return nil + } + } + + return ErrNoAvailablePusher +} diff --git a/core/queue/balancedqueuepusher_test.go b/core/queue/balancedqueuepusher_test.go new file mode 100644 index 000000000..ff246132b --- /dev/null +++ b/core/queue/balancedqueuepusher_test.go @@ -0,0 +1,43 @@ +package queue + +import ( + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBalancedQueuePusher(t *testing.T) { + const numPushers = 100 + var pushers []Pusher + var mockedPushers []*mockedPusher + for i := 0; i < numPushers; i++ { + p := &mockedPusher{ + name: "pusher:" + strconv.Itoa(i), + } + pushers = append(pushers, p) + mockedPushers = append(mockedPushers, p) + } + + pusher := NewBalancedQueuePusher(pushers) + assert.True(t, len(pusher.Name()) > 0) + + for i := 0; i < numPushers*1000; i++ { + assert.Nil(t, pusher.Push("item")) + } + + var counts []int + for _, p := range mockedPushers { + counts = append(counts, p.count) + } + mean := calcMean(counts) + variance := calcVariance(mean, counts) + assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) +} + +func TestBalancedQueuePusher_NoAvailable(t *testing.T) { + pusher := NewBalancedQueuePusher(nil) + assert.True(t, len(pusher.Name()) == 0) + assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item")) +} diff --git a/core/queue/consumer.go b/core/queue/consumer.go new file mode 100644 index 000000000..8f12d97f7 --- /dev/null +++ b/core/queue/consumer.go @@ -0,0 +1,10 @@ +package queue + +type ( + Consumer interface { + Consume(string) error + OnEvent(event interface{}) + } + + ConsumerFactory func() (Consumer, error) +) diff --git a/core/queue/messagequeue.go b/core/queue/messagequeue.go new file mode 100644 index 000000000..569ae5669 --- /dev/null +++ b/core/queue/messagequeue.go @@ -0,0 +1,6 @@ +package queue + +type MessageQueue interface { + Start() + Stop() +} diff --git a/core/queue/multiqueuepusher.go b/core/queue/multiqueuepusher.go new file mode 100644 index 000000000..dd811dc1c --- /dev/null +++ b/core/queue/multiqueuepusher.go @@ -0,0 +1,31 @@ +package queue + +import "github.com/tal-tech/go-zero/core/errorx" + +type MultiQueuePusher struct { + name string + pushers []Pusher +} + +func NewMultiQueuePusher(pushers []Pusher) Pusher { + return &MultiQueuePusher{ + name: generateName(pushers), + pushers: pushers, + } +} + +func (pusher *MultiQueuePusher) Name() string { + return pusher.name +} + +func (pusher *MultiQueuePusher) Push(message string) error { + var batchError errorx.BatchError + + for _, each := range pusher.pushers { + if err := each.Push(message); err != nil { + batchError.Add(err) + } + } + + return batchError.Err() +} diff --git a/core/queue/multiqueuepusher_test.go b/core/queue/multiqueuepusher_test.go new file mode 100644 index 000000000..5721154f9 --- /dev/null +++ b/core/queue/multiqueuepusher_test.go @@ -0,0 +1,39 @@ +package queue + +import ( + "fmt" + "math" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMultiQueuePusher(t *testing.T) { + const numPushers = 100 + var pushers []Pusher + var mockedPushers []*mockedPusher + for i := 0; i < numPushers; i++ { + p := &mockedPusher{ + name: "pusher:" + strconv.Itoa(i), + } + pushers = append(pushers, p) + mockedPushers = append(mockedPushers, p) + } + + pusher := NewMultiQueuePusher(pushers) + assert.True(t, len(pusher.Name()) > 0) + + for i := 0; i < 1000; i++ { + _ = pusher.Push("item") + } + + var counts []int + for _, p := range mockedPushers { + counts = append(counts, p.count) + } + mean := calcMean(counts) + variance := calcVariance(mean, counts) + assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10) + assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) +} diff --git a/core/queue/producer.go b/core/queue/producer.go new file mode 100644 index 000000000..c0ca935d3 --- /dev/null +++ b/core/queue/producer.go @@ -0,0 +1,15 @@ +package queue + +type ( + Producer interface { + AddListener(listener ProduceListener) + Produce() (string, bool) + } + + ProduceListener interface { + OnProducerPause() + OnProducerResume() + } + + ProducerFactory func() (Producer, error) +) diff --git a/core/queue/queue.go b/core/queue/queue.go new file mode 100644 index 000000000..ec917bec7 --- /dev/null +++ b/core/queue/queue.go @@ -0,0 +1,239 @@ +package queue + +import ( + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/rescue" + "github.com/tal-tech/go-zero/core/stat" + "github.com/tal-tech/go-zero/core/threading" + "github.com/tal-tech/go-zero/core/timex" +) + +const queueName = "queue" + +type ( + Queue struct { + name string + metrics *stat.Metrics + producerFactory ProducerFactory + producerRoutineGroup *threading.RoutineGroup + consumerFactory ConsumerFactory + consumerRoutineGroup *threading.RoutineGroup + producerCount int + consumerCount int + active int32 + channel chan string + quit chan struct{} + listeners []Listener + eventLock sync.Mutex + eventChannels []chan interface{} + } + + Listener interface { + OnPause() + OnResume() + } + + Poller interface { + Name() string + Poll() string + } + + Pusher interface { + Name() string + Push(string) error + } +) + +func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue { + queue := &Queue{ + metrics: stat.NewMetrics(queueName), + producerFactory: producerFactory, + producerRoutineGroup: threading.NewRoutineGroup(), + consumerFactory: consumerFactory, + consumerRoutineGroup: threading.NewRoutineGroup(), + producerCount: runtime.NumCPU(), + consumerCount: runtime.NumCPU() << 1, + channel: make(chan string), + quit: make(chan struct{}), + } + queue.SetName(queueName) + + return queue +} + +func (queue *Queue) AddListener(listener Listener) { + queue.listeners = append(queue.listeners, listener) +} + +func (queue *Queue) Broadcast(message interface{}) { + go func() { + queue.eventLock.Lock() + defer queue.eventLock.Unlock() + + for _, channel := range queue.eventChannels { + channel <- message + } + }() +} + +func (queue *Queue) SetName(name string) { + queue.name = name + queue.metrics.SetName(name) +} + +func (queue *Queue) SetNumConsumer(count int) { + queue.consumerCount = count +} + +func (queue *Queue) SetNumProducer(count int) { + queue.producerCount = count +} + +func (queue *Queue) Start() { + queue.startProducers(queue.producerCount) + queue.startConsumers(queue.consumerCount) + + queue.producerRoutineGroup.Wait() + close(queue.channel) + queue.consumerRoutineGroup.Wait() +} + +func (queue *Queue) Stop() { + close(queue.quit) +} + +func (queue *Queue) consume(eventChan chan interface{}) { + var consumer Consumer + + for { + var err error + if consumer, err = queue.consumerFactory(); err != nil { + logx.Errorf("Error on creating consumer: %v", err) + time.Sleep(time.Second) + } else { + break + } + } + + for { + select { + case message, ok := <-queue.channel: + if ok { + queue.consumeOne(consumer, message) + } else { + logx.Info("Task channel was closed, quitting consumer...") + return + } + case event := <-eventChan: + consumer.OnEvent(event) + } + } +} + +func (queue *Queue) consumeOne(consumer Consumer, message string) { + threading.RunSafe(func() { + startTime := timex.Now() + defer func() { + duration := timex.Since(startTime) + queue.metrics.Add(stat.Task{ + Duration: duration, + }) + logx.WithDuration(duration).Infof("%s", message) + }() + + if err := consumer.Consume(message); err != nil { + logx.Errorf("Error occurred while consuming %v: %v", message, err) + } + }) +} + +func (queue *Queue) pause() { + for _, listener := range queue.listeners { + listener.OnPause() + } +} + +func (queue *Queue) produce() { + var producer Producer + + for { + var err error + if producer, err = queue.producerFactory(); err != nil { + logx.Errorf("Error on creating producer: %v", err) + time.Sleep(time.Second) + } else { + break + } + } + + atomic.AddInt32(&queue.active, 1) + producer.AddListener(routineListener{ + queue: queue, + }) + + for { + select { + case <-queue.quit: + logx.Info("Quitting producer") + return + default: + if v, ok := queue.produceOne(producer); ok { + queue.channel <- v + } + } + } +} + +func (queue *Queue) produceOne(producer Producer) (string, bool) { + // avoid panic quit the producer, just log it and continue + defer rescue.Recover() + + return producer.Produce() +} + +func (queue *Queue) resume() { + for _, listener := range queue.listeners { + listener.OnResume() + } +} + +func (queue *Queue) startConsumers(number int) { + for i := 0; i < number; i++ { + eventChan := make(chan interface{}) + queue.eventLock.Lock() + queue.eventChannels = append(queue.eventChannels, eventChan) + queue.eventLock.Unlock() + queue.consumerRoutineGroup.Run(func() { + queue.consume(eventChan) + }) + } +} + +func (queue *Queue) startProducers(number int) { + for i := 0; i < number; i++ { + queue.producerRoutineGroup.Run(func() { + queue.produce() + }) + } +} + +type routineListener struct { + queue *Queue +} + +func (rl routineListener) OnProducerPause() { + if atomic.AddInt32(&rl.queue.active, -1) <= 0 { + rl.queue.pause() + } +} + +func (rl routineListener) OnProducerResume() { + if atomic.AddInt32(&rl.queue.active, 1) == 1 { + rl.queue.resume() + } +} diff --git a/core/queue/queue_test.go b/core/queue/queue_test.go new file mode 100644 index 000000000..32279094a --- /dev/null +++ b/core/queue/queue_test.go @@ -0,0 +1,94 @@ +package queue + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + consumers = 4 + rounds = 100 +) + +func TestQueue(t *testing.T) { + producer := newMockedProducer(rounds) + consumer := newMockedConsumer() + consumer.wait.Add(consumers) + q := NewQueue(func() (Producer, error) { + return producer, nil + }, func() (Consumer, error) { + return consumer, nil + }) + q.AddListener(new(mockedListener)) + q.SetName("mockqueue") + q.SetNumConsumer(consumers) + q.SetNumProducer(1) + q.pause() + q.resume() + go func() { + producer.wait.Wait() + q.Stop() + }() + q.Start() + assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) +} + +type mockedConsumer struct { + count int32 + events int32 + wait sync.WaitGroup +} + +func newMockedConsumer() *mockedConsumer { + return new(mockedConsumer) +} + +func (c *mockedConsumer) Consume(string) error { + atomic.AddInt32(&c.count, 1) + return nil +} + +func (c *mockedConsumer) OnEvent(interface{}) { + if atomic.AddInt32(&c.events, 1) <= consumers { + c.wait.Done() + } +} + +type mockedProducer struct { + total int32 + count int32 + wait sync.WaitGroup +} + +func newMockedProducer(total int32) *mockedProducer { + p := new(mockedProducer) + p.total = total + p.wait.Add(int(total)) + return p +} + +func (p *mockedProducer) AddListener(listener ProduceListener) { +} + +func (p *mockedProducer) Produce() (string, bool) { + if atomic.AddInt32(&p.count, 1) <= p.total { + p.wait.Done() + return "item", true + } else { + time.Sleep(time.Second) + return "", false + } +} + +type mockedListener struct { +} + +func (l *mockedListener) OnPause() { +} + +func (l *mockedListener) OnResume() { +} diff --git a/core/queue/util.go b/core/queue/util.go new file mode 100644 index 000000000..126680cc4 --- /dev/null +++ b/core/queue/util.go @@ -0,0 +1,12 @@ +package queue + +import "strings" + +func generateName(pushers []Pusher) string { + names := make([]string, len(pushers)) + for i, pusher := range pushers { + names[i] = pusher.Name() + } + + return strings.Join(names, ",") +} diff --git a/core/queue/util_test.go b/core/queue/util_test.go new file mode 100644 index 000000000..44846848b --- /dev/null +++ b/core/queue/util_test.go @@ -0,0 +1,77 @@ +package queue + +import ( + "errors" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/mathx" +) + +var ( + proba = mathx.NewProba() + failProba = 0.01 +) + +func init() { + logx.Disable() +} + +func TestGenerateName(t *testing.T) { + pushers := []Pusher{ + &mockedPusher{name: "first"}, + &mockedPusher{name: "second"}, + &mockedPusher{name: "third"}, + } + + assert.Equal(t, "first,second,third", generateName(pushers)) +} + +func TestGenerateNameNil(t *testing.T) { + var pushers []Pusher + assert.Equal(t, "", generateName(pushers)) +} + +func calcMean(vals []int) float64 { + if len(vals) == 0 { + return 0 + } + + var result float64 + for _, val := range vals { + result += float64(val) + } + return result / float64(len(vals)) +} + +func calcVariance(mean float64, vals []int) float64 { + if len(vals) == 0 { + return 0 + } + + var result float64 + for _, val := range vals { + result += math.Pow(float64(val)-mean, 2) + } + return result / float64(len(vals)) +} + +type mockedPusher struct { + name string + count int +} + +func (p *mockedPusher) Name() string { + return p.name +} + +func (p *mockedPusher) Push(s string) error { + if proba.TrueOnProba(failProba) { + return errors.New("dummy") + } + + p.count++ + return nil +} diff --git a/core/search/searchtree.go b/core/search/tree.go similarity index 100% rename from core/search/searchtree.go rename to core/search/tree.go diff --git a/core/search/searchtree_debug.go b/core/search/tree_debug.go similarity index 100% rename from core/search/searchtree_debug.go rename to core/search/tree_debug.go diff --git a/core/search/searchtree_test.go b/core/search/tree_test.go similarity index 100% rename from core/search/searchtree_test.go rename to core/search/tree_test.go diff --git a/core/stat/internal/cpu_linux.go b/core/stat/internal/cpu_linux.go index fea314b7b..460d3b99e 100644 --- a/core/stat/internal/cpu_linux.go +++ b/core/stat/internal/cpu_linux.go @@ -7,7 +7,7 @@ import ( "time" "github.com/tal-tech/go-zero/core/iox" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" ) const ( @@ -24,17 +24,17 @@ var ( func init() { cpus, err := perCpuUsage() - lang.Must(err) + logx.Must(err) cores = uint64(len(cpus)) sets, err := cpuSets() - lang.Must(err) + logx.Must(err) quota = float64(len(sets)) cq, err := cpuQuota() if err == nil { if cq != -1 { period, err := cpuPeriod() - lang.Must(err) + logx.Must(err) limit := float64(cq) / float64(period) if limit < quota { @@ -44,10 +44,10 @@ func init() { } preSystem, err = systemCpuUsage() - lang.Must(err) + logx.Must(err) preTotal, err = totalCpuUsage() - lang.Must(err) + logx.Must(err) } func RefreshCpu() uint64 { diff --git a/core/stores/internal/cleaner.go b/core/stores/internal/cleaner.go index 3838207a7..5d4b86567 100644 --- a/core/stores/internal/cleaner.go +++ b/core/stores/internal/cleaner.go @@ -5,7 +5,6 @@ import ( "time" "github.com/tal-tech/go-zero/core/collection" - "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/proc" "github.com/tal-tech/go-zero/core/stat" @@ -33,7 +32,7 @@ type delayTask struct { func init() { var err error timingWheel, err = collection.NewTimingWheel(time.Second, timingWheelSlots, clean) - lang.Must(err) + logx.Must(err) proc.AddShutdownListener(func() { timingWheel.Drain(clean) diff --git a/core/sysx/host.go b/core/sysx/host.go index dccddf9fa..8cfb54a19 100644 --- a/core/sysx/host.go +++ b/core/sysx/host.go @@ -3,7 +3,7 @@ package sysx import ( "os" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/stringx" ) var hostname string @@ -11,7 +11,9 @@ var hostname string func init() { var err error hostname, err = os.Hostname() - lang.Must(err) + if err != nil { + hostname = stringx.RandId() + } } func Hostname() string { diff --git a/doc/go-framework.pdf b/doc/go-framework.pdf deleted file mode 100644 index 4943afc53..000000000 Binary files a/doc/go-framework.pdf and /dev/null differ diff --git a/doc/goctl.md b/doc/goctl.md index b8eea01ad..818405767 100644 --- a/doc/goctl.md +++ b/doc/goctl.md @@ -3,8 +3,8 @@ ## goctl用途 * 定义api请求 * 根据定义的api自动生成golang(后端), java(iOS & Android), typescript(web & 晓程序),dart(flutter) -* 生成MySQL CURD (https://goctl.xiaoheiban.cn) -* 生成MongoDB CURD (https://goctl.xiaoheiban.cn) +* 生成MySQL CURD+Cache +* 生成MongoDB CURD+Cache ## goctl使用说明 #### goctl参数说明 @@ -179,23 +179,31 @@ service user-api { * 在定义的get/post/put/delete等请求的handler和logic里增加处理业务逻辑的代码 #### 根据定义好的api文件生成java代码 - `goctl api java -api user/user.api -dir ./src` +```shell +goctl api java -api user/user.api -dir ./src +``` #### 根据定义好的api文件生成typescript代码 - `goctl api ts -api user/user.api -dir ./src -webapi ***` - - ts需要指定webapi所在目录 +```shell +goctl api ts -api user/user.api -dir ./src -webapi *** + +ts需要指定webapi所在目录 +``` #### 根据定义好的api文件生成Dart代码 - `goctl api dart -api user/user.api -dir ./src` +```shell +goctl api dart -api user/user.api -dir ./src +``` ## 根据定义好的简单go文件生成mongo代码文件(仅限golang使用) - `goctl model mongo -src {{yourDir}}/xiao/service/xhb/user/model/usermodel.go -cache yes` - - -src需要提供简单的usermodel.go文件,里面只需要提供一个结构体即可 - -cache 控制是否需要缓存 yes=需要 no=不需要 - src 示例代码如下 - ``` +```shell +goctl model mongo -src {{yourDir}}/xiao/service/xhb/user/model/usermodel.go -cache yes + +-src需要提供简单的usermodel.go文件,里面只需要提供一个结构体即可 +-cache 控制是否需要缓存 yes=需要 no=不需要 +src 示例代码如下 +``` + ```go package model type User struct { @@ -261,5 +269,4 @@ type User struct { │   └── test.go [强制覆盖更新] └── test.proto ``` - - 注意 :目前rpc目录生成的proto文件暂不支持import外部proto文件 -* 如有不理解的地方,随时问Kim/Kevin \ No newline at end of file + - 注意 :目前rpc目录生成的proto文件暂不支持import外部proto文件 \ No newline at end of file diff --git a/example/breaker/main.go b/example/breaker/main.go index 8bfa30f5e..23cae92ea 100644 --- a/example/breaker/main.go +++ b/example/breaker/main.go @@ -10,6 +10,7 @@ import ( "github.com/tal-tech/go-zero/core/breaker" "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "gopkg.in/cheggaaa/pb.v1" ) @@ -99,7 +100,7 @@ func main() { gb := breaker.NewBreaker() fp, err := os.Create("result.csv") - lang.Must(err) + logx.Must(err) defer fp.Close() fmt.Fprintln(fp, "seconds,state,googleCalls,netflixCalls") diff --git a/example/etcd/sub/sub.go b/example/etcd/sub/sub.go index 559737cad..284f7f3ab 100644 --- a/example/etcd/sub/sub.go +++ b/example/etcd/sub/sub.go @@ -5,12 +5,12 @@ import ( "time" "github.com/tal-tech/go-zero/core/discov" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" ) func main() { sub, err := discov.NewSubscriber([]string{"etcd.discovery:2379"}, "028F2C35852D", discov.Exclusive()) - lang.Must(err) + logx.Must(err) ticker := time.NewTicker(time.Second * 3) defer ticker.Stop() diff --git a/example/graceful/dns/api/svc/servicecontext.go b/example/graceful/dns/api/svc/servicecontext.go index 7bd478e0e..42d8d4b72 100644 --- a/example/graceful/dns/api/svc/servicecontext.go +++ b/example/graceful/dns/api/svc/servicecontext.go @@ -3,5 +3,5 @@ package svc import "github.com/tal-tech/go-zero/rpcx" type ServiceContext struct { - Client *rpcx.RpcClient + Client rpcx.Client } diff --git a/example/graceful/etcd/api/svc/servicecontext.go b/example/graceful/etcd/api/svc/servicecontext.go index 7bd478e0e..42d8d4b72 100644 --- a/example/graceful/etcd/api/svc/servicecontext.go +++ b/example/graceful/etcd/api/svc/servicecontext.go @@ -3,5 +3,5 @@ package svc import "github.com/tal-tech/go-zero/rpcx" type ServiceContext struct { - Client *rpcx.RpcClient + Client rpcx.Client } diff --git a/example/http/breaker/client/client.go b/example/http/breaker/client/client.go index a756c3e13..131f14f94 100644 --- a/example/http/breaker/client/client.go +++ b/example/http/breaker/client/client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/threading" "gopkg.in/cheggaaa/pb.v1" ) @@ -119,14 +120,14 @@ func main() { flag.Parse() fp, err := os.Create("result.csv") - lang.Must(err) + logx.Must(err) defer fp.Close() fmt.Fprintln(fp, "seconds,goodOk,goodFail,goodReject,goodErrs,goodUnknowns,goodDropRatio,"+ "heavyOk,heavyFail,heavyReject,heavyErrs,heavyUnknowns,heavyDropRatio") var gm, hm metric dur, err := time.ParseDuration(*duration) - lang.Must(err) + logx.Must(err) done := make(chan lang.PlaceholderType) group := threading.NewRoutineGroup() group.RunSafe(func() { diff --git a/example/load/main.go b/example/load/main.go index efefbfaed..d4d6f8a0c 100644 --- a/example/load/main.go +++ b/example/load/main.go @@ -13,7 +13,7 @@ import ( "github.com/tal-tech/go-zero/core/collection" "github.com/tal-tech/go-zero/core/executors" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/syncx" "gopkg.in/cheggaaa/pb.v1" ) @@ -47,7 +47,7 @@ func main() { lessWriter = executors.NewLessExecutor(interval * total / 100) fp, err := os.Create("result.csv") - lang.Must(err) + logx.Must(err) defer fp.Close() fmt.Fprintln(fp, "second,maxFlight,flying,agressiveAvgFlying,lazyAvgFlying,bothAvgFlying") diff --git a/example/load/simulate/client/main.go b/example/load/simulate/client/main.go index da98c97a5..529d2f877 100644 --- a/example/load/simulate/client/main.go +++ b/example/load/simulate/client/main.go @@ -11,7 +11,7 @@ import ( "time" "github.com/tal-tech/go-zero/core/fx" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" ) var ( @@ -27,7 +27,7 @@ func main() { flag.Parse() fp, err := os.Create("result.csv") - lang.Must(err) + logx.Must(err) defer fp.Close() fmt.Fprintln(fp, "seconds,total,pass,fail,drop") diff --git a/example/periodicalexecutor/pe.go b/example/periodicalexecutor/pe.go index 3be333c84..fbaa5a948 100644 --- a/example/periodicalexecutor/pe.go +++ b/example/periodicalexecutor/pe.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "time" "github.com/tal-tech/go-zero/core/executors" @@ -8,7 +9,7 @@ import ( func main() { exeutor := executors.NewBulkExecutor(func(items []interface{}) { - println(len(items)) + fmt.Println(len(items)) }, executors.WithBulkTasks(10)) for { exeutor.Add(1) diff --git a/example/tracing/edge/main.go b/example/tracing/edge/main.go index ce43a02ae..558ba94b5 100644 --- a/example/tracing/edge/main.go +++ b/example/tracing/edge/main.go @@ -15,7 +15,7 @@ import ( var ( configFile = flag.String("f", "config.json", "the config file") - client *rpcx.RpcClient + client rpcx.Client ) func handle(w http.ResponseWriter, r *http.Request) { diff --git a/example/tracing/portal/server.go b/example/tracing/portal/server.go index 471983443..6c5e0c4d6 100644 --- a/example/tracing/portal/server.go +++ b/example/tracing/portal/server.go @@ -20,11 +20,11 @@ type ( } PortalServer struct { - userRpc *rpcx.RpcClient + userRpc rpcx.Client } ) -func NewPortalServer(client *rpcx.RpcClient) *PortalServer { +func NewPortalServer(client rpcx.Client) *PortalServer { return &PortalServer{ userRpc: client, } diff --git a/readme.md b/readme.md index 07102d79c..90f38f295 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,8 @@ # go-zero项目介绍 -![Go](https://github.com/tal-tech/go-zero/workflows/Go/badge.svg?branch=master) +[![Go](https://github.com/tal-tech/go-zero/workflows/Go/badge.svg?branch=master)](https://github.com/tal-tech/go-zero/actions) +[![codecov](https://codecov.io/gh/tal-tech/go-zero/branch/master/graph/badge.svg)](https://codecov.io/gh/tal-tech/go-zero) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) ## 1. go-zero框架背景 @@ -144,27 +146,26 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架,有如下 8 directories, 9 files ``` - 生成的代码可以直接运行: - - ```shell + +```shell cd greet go run greet.go -f etc/greet-api.json - ``` +``` - 默认侦听在8888端口(可以在配置文件里修改),可以通过curl请求: +默认侦听在8888端口(可以在配置文件里修改),可以通过curl请求: - ```shell +```shell ➜ go-zero git:(master) curl -w "\ncode: %{http_code}\n" http://localhost:8888/greet/from/kevin {"code":0} code: 200 - ``` +``` - 编写业务代码: +编写业务代码: - * 可以在servicecontext.go里面传递依赖给logic,比如mysql, redis等 +* 可以在servicecontext.go里面传递依赖给logic,比如mysql, redis等 * 在api定义的get/post/put/delete等请求对应的logic里增加业务处理逻辑 - + 4. 可以根据api文件生成前端需要的Java, TypeScript, Dart, JavaScript代码 ```shell @@ -173,6 +174,10 @@ go-zero是一个集成了各种工程实践的包含web和rpc框架,有如下 ... ``` +## 8. 文档 + +* [goctl使用帮助](doc/goctl.md) + ### 微信交流群 添加我的微信:kevwan,请注明go-zero,我拉进go-zero社区群🤝 diff --git a/rest/handler/authhandler.go b/rest/handler/authhandler.go index 253dc5030..5ed6caf13 100644 --- a/rest/handler/authhandler.go +++ b/rest/handler/authhandler.go @@ -8,7 +8,7 @@ import ( "github.com/dgrijalva/jwt-go" "github.com/tal-tech/go-zero/core/logx" - "github.com/tal-tech/go-zero/rest/internal" + "github.com/tal-tech/go-zero/rest/token" ) const ( @@ -43,7 +43,7 @@ func Authorize(secret string, opts ...AuthorizeOption) func(http.Handler) http.H opt(&authOpts) } - parser := internal.NewTokenParser() + parser := token.NewTokenParser() return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { token, err := parser.ParseToken(r, secret, authOpts.PrevSecret) diff --git a/rest/handler/breakerhandler.go b/rest/handler/breakerhandler.go index 6e10a56d4..846e68be6 100644 --- a/rest/handler/breakerhandler.go +++ b/rest/handler/breakerhandler.go @@ -8,7 +8,7 @@ import ( "github.com/tal-tech/go-zero/core/breaker" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stat" - "github.com/tal-tech/go-zero/rest/internal" + "github.com/tal-tech/go-zero/rest/httpx" "github.com/tal-tech/go-zero/rest/internal/security" ) @@ -22,7 +22,7 @@ func BreakerHandler(method, path string, metrics *stat.Metrics) func(http.Handle if err != nil { metrics.AddDrop() logx.Errorf("[http] dropped, %s - %s - %s", - r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent()) + r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent()) w.WriteHeader(http.StatusServiceUnavailable) return } diff --git a/rest/handler/loghandler.go b/rest/handler/loghandler.go index 577c1c564..96157215a 100644 --- a/rest/handler/loghandler.go +++ b/rest/handler/loghandler.go @@ -13,6 +13,7 @@ import ( "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/timex" "github.com/tal-tech/go-zero/core/utils" + "github.com/tal-tech/go-zero/rest/httpx" "github.com/tal-tech/go-zero/rest/internal" ) @@ -112,10 +113,10 @@ func logBrief(r *http.Request, code int, timer *utils.ElapsedTimer, logs *intern var buf bytes.Buffer duration := timer.Duration() buf.WriteString(fmt.Sprintf("%d - %s - %s - %s - %s", - code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))) + code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration))) if duration > slowThreshold { logx.Slowf("[HTTP] %d - %s - %s - %s - slowcall(%s)", - code, r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)) + code, r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(), timex.ReprOfDuration(duration)) } ok := isOkResponse(code) diff --git a/rest/handler/sheddinghandler.go b/rest/handler/sheddinghandler.go index fa70f7970..568e78873 100644 --- a/rest/handler/sheddinghandler.go +++ b/rest/handler/sheddinghandler.go @@ -7,7 +7,7 @@ import ( "github.com/tal-tech/go-zero/core/load" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stat" - "github.com/tal-tech/go-zero/rest/internal" + "github.com/tal-tech/go-zero/rest/httpx" "github.com/tal-tech/go-zero/rest/internal/security" ) @@ -35,7 +35,7 @@ func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Hand metrics.AddDrop() sheddingStat.IncrementDrop() logx.Errorf("[http] dropped, %s - %s - %s", - r.RequestURI, internal.GetRemoteAddr(r), r.UserAgent()) + r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent()) w.WriteHeader(http.StatusServiceUnavailable) return } diff --git a/rest/httpx/requests.go b/rest/httpx/requests.go index e62f68426..bf217da58 100644 --- a/rest/httpx/requests.go +++ b/rest/httpx/requests.go @@ -39,7 +39,7 @@ func Parse(r *http.Request, v interface{}) error { // Parses the form request. func ParseForm(r *http.Request, v interface{}) error { - if strings.Index(r.Header.Get(ContentType), multipartFormData) != -1 { + if strings.Contains(r.Header.Get(ContentType), multipartFormData) { if err := r.ParseMultipartForm(maxMemory); err != nil { return err } @@ -84,7 +84,6 @@ func ParseHeader(headerValue string) map[string]string { // Parses the post request which contains json in body. func ParseJsonBody(r *http.Request, v interface{}) error { var reader io.Reader - if withJsonBody(r) { reader = io.LimitReader(r.Body, maxBodyLen) } else { @@ -107,5 +106,5 @@ func ParsePath(r *http.Request, v interface{}) error { } func withJsonBody(r *http.Request) bool { - return r.ContentLength > 0 && strings.Index(r.Header.Get(ContentType), ApplicationJson) != -1 + return r.ContentLength > 0 && strings.Contains(r.Header.Get(ContentType), ApplicationJson) } diff --git a/rest/internal/util.go b/rest/httpx/util.go similarity index 93% rename from rest/internal/util.go rename to rest/httpx/util.go index 5e7c2e2be..bdf6eaa82 100644 --- a/rest/internal/util.go +++ b/rest/httpx/util.go @@ -1,4 +1,4 @@ -package internal +package httpx import "net/http" diff --git a/rest/internal/util_test.go b/rest/httpx/util_test.go similarity index 94% rename from rest/internal/util_test.go rename to rest/httpx/util_test.go index 96594454f..bcd9c4163 100644 --- a/rest/internal/util_test.go +++ b/rest/httpx/util_test.go @@ -1,4 +1,4 @@ -package internal +package httpx import ( "net/http" @@ -16,4 +16,3 @@ func TestGetRemoteAddr(t *testing.T) { r.Header.Set(xForwardFor, host) assert.Equal(t, host, GetRemoteAddr(r)) } - diff --git a/rest/httpx/constants.go b/rest/httpx/vars.go similarity index 100% rename from rest/httpx/constants.go rename to rest/httpx/vars.go diff --git a/rest/internal/log.go b/rest/internal/log.go index 780138353..80b193552 100644 --- a/rest/internal/log.go +++ b/rest/internal/log.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/rest/httpx" ) const LogContext = "request_logs" @@ -79,5 +80,5 @@ func formatf(r *http.Request, format string, v ...interface{}) string { } func formatWithReq(r *http.Request, v string) string { - return fmt.Sprintf("(%s - %s) %s", r.RequestURI, GetRemoteAddr(r), v) + return fmt.Sprintf("(%s - %s) %s", r.RequestURI, httpx.GetRemoteAddr(r), v) } diff --git a/rest/ngin_test.go b/rest/ngin_test.go index a4591197e..019294545 100644 --- a/rest/ngin_test.go +++ b/rest/ngin_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/rest/httpx" - "github.com/tal-tech/go-zero/rest/internal/router" + "github.com/tal-tech/go-zero/rest/router" ) func TestWithMiddleware(t *testing.T) { diff --git a/rest/internal/router/patrouter.go b/rest/router/patrouter.go similarity index 100% rename from rest/internal/router/patrouter.go rename to rest/router/patrouter.go diff --git a/rest/internal/router/patrouter_test.go b/rest/router/patrouter_test.go similarity index 100% rename from rest/internal/router/patrouter_test.go rename to rest/router/patrouter_test.go diff --git a/rest/server.go b/rest/server.go index 005f3692c..694968683 100644 --- a/rest/server.go +++ b/rest/server.go @@ -13,7 +13,7 @@ import ( "github.com/tal-tech/go-zero/rest/handler" "github.com/tal-tech/go-zero/rest/httpx" "github.com/tal-tech/go-zero/rest/internal" - "github.com/tal-tech/go-zero/rest/internal/router" + "github.com/tal-tech/go-zero/rest/router" ) // use 1000m to represent 100% diff --git a/rest/internal/tokenparser.go b/rest/token/tokenparser.go similarity index 99% rename from rest/internal/tokenparser.go rename to rest/token/tokenparser.go index d2eb94360..f2927b16a 100644 --- a/rest/internal/tokenparser.go +++ b/rest/token/tokenparser.go @@ -1,4 +1,4 @@ -package internal +package token import ( "net/http" diff --git a/rest/internal/tokenparser_test.go b/rest/token/tokenparser_test.go similarity index 99% rename from rest/internal/tokenparser_test.go rename to rest/token/tokenparser_test.go index d486adcc4..215b4db56 100644 --- a/rest/internal/tokenparser_test.go +++ b/rest/token/tokenparser_test.go @@ -1,4 +1,4 @@ -package internal +package token import ( "net/http" diff --git a/rpcx/client.go b/rpcx/client.go index cb5239510..7f589d560 100644 --- a/rpcx/client.go +++ b/rpcx/client.go @@ -10,11 +10,22 @@ import ( "google.golang.org/grpc" ) -type RpcClient struct { - client internal.Client -} +var ( + WithDialOption = internal.WithDialOption + WithTimeout = internal.WithTimeout +) -func MustNewClient(c RpcClientConf, options ...internal.ClientOption) *RpcClient { +type ( + Client interface { + Conn() *grpc.ClientConn + } + + RpcClient struct { + client Client + } +) + +func MustNewClient(c RpcClientConf, options ...internal.ClientOption) Client { cli, err := NewClient(c, options...) if err != nil { log.Fatal(err) @@ -23,20 +34,20 @@ func MustNewClient(c RpcClientConf, options ...internal.ClientOption) *RpcClient return cli } -func NewClient(c RpcClientConf, options ...internal.ClientOption) (*RpcClient, error) { +func NewClient(c RpcClientConf, options ...internal.ClientOption) (Client, error) { var opts []internal.ClientOption if c.HasCredential() { - opts = append(opts, internal.WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{ + opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{ App: c.App, Token: c.Token, }))) } if c.Timeout > 0 { - opts = append(opts, internal.WithTimeout(time.Duration(c.Timeout)*time.Millisecond)) + opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond)) } opts = append(opts, options...) - var client internal.Client + var client Client var err error if len(c.Server) > 0 { client, err = internal.NewDirectClient(c.Server, opts...) @@ -52,7 +63,7 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (*RpcClient, e }, nil } -func NewClientNoAuth(c discov.EtcdConf) (*RpcClient, error) { +func NewClientNoAuth(c discov.EtcdConf) (Client, error) { client, err := internal.NewDiscovClient(c.Hosts, c.Key) if err != nil { return nil, err diff --git a/rpcx/internal/balancer/p2c/p2c.go b/rpcx/internal/balancer/p2c/p2c.go index 07fb3e81b..431874e90 100644 --- a/rpcx/internal/balancer/p2c/p2c.go +++ b/rpcx/internal/balancer/p2c/p2c.go @@ -21,7 +21,7 @@ import ( const ( Name = "p2c_ewma" - decayTime = int64(time.Millisecond * 600) + decayTime = int64(time.Second * 10) // default value from finagle forcePick = int64(time.Second) initSuccess = 1000 throttleSuccess = initSuccess / 2 diff --git a/rpcx/internal/client.go b/rpcx/internal/client.go index 9576f2619..1b799787d 100644 --- a/rpcx/internal/client.go +++ b/rpcx/internal/client.go @@ -18,10 +18,6 @@ type ( } ClientOption func(options *ClientOptions) - - Client interface { - Conn() *grpc.ClientConn - } ) func WithDialOption(opt grpc.DialOption) ClientOption { diff --git a/rpcx/internal/authinterceptor.go b/rpcx/internal/serverinterceptors/authinterceptor.go similarity index 96% rename from rpcx/internal/authinterceptor.go rename to rpcx/internal/serverinterceptors/authinterceptor.go index d59e01586..7cea029fe 100644 --- a/rpcx/internal/authinterceptor.go +++ b/rpcx/internal/serverinterceptors/authinterceptor.go @@ -1,4 +1,4 @@ -package internal +package serverinterceptors import ( "context" diff --git a/rpcx/proxy.go b/rpcx/proxy.go index e4c90d0a7..dfdff273c 100644 --- a/rpcx/proxy.go +++ b/rpcx/proxy.go @@ -12,7 +12,7 @@ import ( type RpcProxy struct { backend string - clients map[string]*RpcClient + clients map[string]Client options []internal.ClientOption sharedCalls syncx.SharedCalls lock sync.Mutex @@ -21,7 +21,7 @@ type RpcProxy struct { func NewRpcProxy(backend string, opts ...internal.ClientOption) *RpcProxy { return &RpcProxy{ backend: backend, - clients: make(map[string]*RpcClient), + clients: make(map[string]Client), options: opts, sharedCalls: syncx.NewSharedCalls(), } diff --git a/rpcx/server.go b/rpcx/server.go index 76dc086f6..040808bb2 100644 --- a/rpcx/server.go +++ b/rpcx/server.go @@ -117,8 +117,8 @@ func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Me return err } - server.AddStreamInterceptors(internal.StreamAuthorizeInterceptor(authenticator)) - server.AddUnaryInterceptors(internal.UnaryAuthorizeInterceptor(authenticator)) + server.AddStreamInterceptors(serverinterceptors.StreamAuthorizeInterceptor(authenticator)) + server.AddUnaryInterceptors(serverinterceptors.UnaryAuthorizeInterceptor(authenticator)) } return nil diff --git a/tools/goctl/api/dartgen/gen.go b/tools/goctl/api/dartgen/gen.go index 68a481248..6ee7623a0 100644 --- a/tools/goctl/api/dartgen/gen.go +++ b/tools/goctl/api/dartgen/gen.go @@ -4,7 +4,7 @@ import ( "errors" "strings" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/api/parser" "github.com/urfave/cli" ) @@ -32,8 +32,8 @@ func DartCommand(c *cli.Context) error { dir = dir + "/" } api.Info.Title = strings.Replace(apiFile, ".api", "", -1) - lang.Must(genData(dir+"data/", api)) - lang.Must(genApi(dir+"api/", api)) - lang.Must(genVars(dir + "vars/")) + logx.Must(genData(dir+"data/", api)) + logx.Must(genApi(dir+"api/", api)) + logx.Must(genVars(dir + "vars/")) return nil } diff --git a/tools/goctl/api/gogen/gen.go b/tools/goctl/api/gogen/gen.go index 0468585cc..80209f286 100644 --- a/tools/goctl/api/gogen/gen.go +++ b/tools/goctl/api/gogen/gen.go @@ -1,6 +1,7 @@ package gogen import ( + "bytes" "errors" "fmt" "os" @@ -13,7 +14,7 @@ import ( "time" "github.com/logrusorgru/aurora" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" apiformat "github.com/tal-tech/go-zero/tools/goctl/api/format" "github.com/tal-tech/go-zero/tools/goctl/api/parser" apiutil "github.com/tal-tech/go-zero/tools/goctl/api/util" @@ -44,17 +45,18 @@ func GoCommand(c *cli.Context) error { return err } - lang.Must(util.MkdirIfNotExist(dir)) - lang.Must(genEtc(dir, api)) - lang.Must(genConfig(dir)) - lang.Must(genMain(dir, api)) - lang.Must(genServiceContext(dir, api)) - lang.Must(genTypes(dir, api)) - lang.Must(genHandlers(dir, api)) - lang.Must(genRoutes(dir, api)) - lang.Must(genLogic(dir, api)) + logx.Must(util.MkdirIfNotExist(dir)) + logx.Must(genEtc(dir, api)) + logx.Must(genConfig(dir)) + logx.Must(genMain(dir, api)) + logx.Must(genServiceContext(dir, api)) + logx.Must(genTypes(dir, api)) + logx.Must(genHandlers(dir, api)) + logx.Must(genRoutes(dir, api)) + logx.Must(genLogic(dir, api)) // it does not work format(dir) + createGoModFileIfNeed(dir) if err := backupAndSweep(apiFile); err != nil { return err @@ -98,7 +100,7 @@ func format(dir string) { cmd := exec.Command("go", "fmt", "./"+dir+"...") _, err := cmd.CombinedOutput() if err != nil { - print(err.Error()) + fmt.Println(err.Error()) } } @@ -131,3 +133,43 @@ func sweep() error { return nil }) } + +func createGoModFileIfNeed(dir string) { + absDir, err := filepath.Abs(dir) + if err != nil { + panic(err) + } + + var tempPath = absDir + var hasGoMod = false + for { + if tempPath == filepath.Dir(tempPath) { + break + } + tempPath = filepath.Dir(tempPath) + if util.FileExists(filepath.Join(tempPath, goModeIdentifier)) { + tempPath = filepath.Dir(tempPath) + hasGoMod = true + break + } + } + if !hasGoMod { + gopath := os.Getenv("GOPATH") + parent := path.Join(gopath, "src") + pos := strings.Index(absDir, parent) + if pos < 0 { + moduleName := absDir[len(filepath.Dir(absDir))+1:] + cmd := exec.Command("go", "mod", "init", moduleName) + cmd.Dir = dir + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + if err != nil { + fmt.Println(err.Error()) + } + outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes()) + fmt.Printf(outStr + "\n" + errStr) + } + } +} diff --git a/tools/goctl/api/gogen/util.go b/tools/goctl/api/gogen/util.go index 5b3b82748..1306d1b80 100644 --- a/tools/goctl/api/gogen/util.go +++ b/tools/goctl/api/gogen/util.go @@ -23,10 +23,14 @@ func getParentPackage(dir string) (string, error) { return "", err } + absDir = strings.ReplaceAll(absDir, `\`, `/`) var rootPath string var tempPath = absDir var hasGoMod = false for { + if tempPath == filepath.Dir(tempPath) { + break + } tempPath = filepath.Dir(tempPath) if goctlutil.FileExists(filepath.Join(tempPath, goModeIdentifier)) { tempPath = filepath.Dir(tempPath) @@ -43,8 +47,7 @@ func getParentPackage(dir string) (string, error) { parent := path.Join(gopath, "src") pos := strings.Index(absDir, parent) if pos < 0 { - message := fmt.Sprintf("%s not in gomod project path, or not in GOPATH of %s directory", absDir, gopath) - println(message) + fmt.Printf("%s not in gomod project path, or not in GOPATH of %s directory\n", absDir, gopath) tempPath = filepath.Dir(absDir) rootPath = absDir[len(tempPath)+1:] } else { diff --git a/tools/goctl/api/javagen/gen.go b/tools/goctl/api/javagen/gen.go index 086e7dcec..420265efe 100644 --- a/tools/goctl/api/javagen/gen.go +++ b/tools/goctl/api/javagen/gen.go @@ -6,7 +6,7 @@ import ( "strings" "github.com/logrusorgru/aurora" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/api/parser" "github.com/tal-tech/go-zero/tools/goctl/util" "github.com/urfave/cli" @@ -36,9 +36,9 @@ func JavaCommand(c *cli.Context) error { packetName = packetName[:len(packetName)-4] } - lang.Must(util.MkdirIfNotExist(dir)) - lang.Must(genPacket(dir, packetName, api)) - lang.Must(genComponents(dir, packetName, api)) + logx.Must(util.MkdirIfNotExist(dir)) + logx.Must(genPacket(dir, packetName, api)) + logx.Must(genComponents(dir, packetName, api)) fmt.Println(aurora.Green("Done.")) return nil diff --git a/tools/goctl/api/javagen/genpacket.go b/tools/goctl/api/javagen/genpacket.go index 83497bc2a..721eeddc0 100644 --- a/tools/goctl/api/javagen/genpacket.go +++ b/tools/goctl/api/javagen/genpacket.go @@ -175,7 +175,7 @@ func formatFile(tmplBytes *bytes.Buffer, file *os.File) { builder.WriteString(scanner.Text() + "\n") } if err := scanner.Err(); err != nil { - println(err) + fmt.Println(err) } } @@ -268,10 +268,12 @@ func genType(writer io.Writer, tp spec.Type) error { return err } } + writeBreakline(writer) writeIndent(writer, 1) genGetSet(writer, tp, 2) writeIndent(writer, 1) fmt.Fprintln(writer, "}") + return nil } diff --git a/tools/goctl/api/main.go b/tools/goctl/api/main.go index 758d23ca4..53cf4d0fe 100644 --- a/tools/goctl/api/main.go +++ b/tools/goctl/api/main.go @@ -4,7 +4,7 @@ import ( "fmt" "os" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/api/parser" ) @@ -14,8 +14,8 @@ func main() { } p, err := parser.NewParser(os.Args[1]) - lang.Must(err) + logx.Must(err) api, err := p.Parse() - lang.Must(err) + logx.Must(err) fmt.Println(api) } diff --git a/tools/goctl/api/tsgen/gen.go b/tools/goctl/api/tsgen/gen.go index e21b085b6..579378d91 100644 --- a/tools/goctl/api/tsgen/gen.go +++ b/tools/goctl/api/tsgen/gen.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/logrusorgru/aurora" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/api/parser" "github.com/tal-tech/go-zero/tools/goctl/util" "github.com/urfave/cli" @@ -34,9 +34,9 @@ func TsCommand(c *cli.Context) error { return err } - lang.Must(util.MkdirIfNotExist(dir)) - lang.Must(genHandler(dir, webApi, caller, api, unwrapApi)) - lang.Must(genComponents(dir, api)) + logx.Must(util.MkdirIfNotExist(dir)) + logx.Must(genHandler(dir, webApi, caller, api, unwrapApi)) + logx.Must(genComponents(dir, api)) fmt.Println(aurora.Green("Done.")) return nil diff --git a/tools/goctl/api/tsgen/genpacket.go b/tools/goctl/api/tsgen/genpacket.go index 2f7e9cd8e..159c8c6ae 100644 --- a/tools/goctl/api/tsgen/genpacket.go +++ b/tools/goctl/api/tsgen/genpacket.go @@ -156,7 +156,7 @@ func paramsForRoute(route spec.Route, prefixForType func(string) string) string hasBody := hasRequestBody(route) rt, err := goTypeToTs(route.RequestType.Name, prefixForType) if err != nil { - println(err.Error()) + fmt.Println(err.Error()) return "" } if hasParams && hasBody { diff --git a/tools/goctl/api/util/util.go b/tools/goctl/api/util/util.go index c4ec32566..afc77a55d 100644 --- a/tools/goctl/api/util/util.go +++ b/tools/goctl/api/util/util.go @@ -8,13 +8,13 @@ import ( "path" "strings" - "github.com/tal-tech/go-zero/core/lang" + "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/api/spec" "github.com/tal-tech/go-zero/tools/goctl/util" ) func MaybeCreateFile(dir, subdir, file string) (fp *os.File, created bool, err error) { - lang.Must(util.MkdirIfNotExist(path.Join(dir, subdir))) + logx.Must(util.MkdirIfNotExist(path.Join(dir, subdir))) fpath := path.Join(dir, subdir, file) if util.FileExists(fpath) { fmt.Printf("%s exists, ignored generation\n", fpath) diff --git a/tools/goctl/update/update.go b/tools/goctl/update/update.go index 480f8ab41..5b1e42b35 100644 --- a/tools/goctl/update/update.go +++ b/tools/goctl/update/update.go @@ -8,7 +8,6 @@ import ( "github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/core/hash" - "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/tools/goctl/update/config" "github.com/tal-tech/go-zero/tools/goctl/util" @@ -56,5 +55,5 @@ func main() { fs := http.FileServer(http.Dir(c.FileDir)) http.Handle(c.FilePath, http.StripPrefix(c.FilePath, forChksumHandler(path.Join(c.FileDir, filename), fs))) - lang.Must(http.ListenAndServe(c.ListenOn, nil)) + logx.Must(http.ListenAndServe(c.ListenOn, nil)) } diff --git a/tools/goctl/util/path.go b/tools/goctl/util/path.go index 8446b4bb7..9c0926887 100644 --- a/tools/goctl/util/path.go +++ b/tools/goctl/util/path.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "path" - "path/filepath" "strings" "github.com/tal-tech/go-zero/tools/goctl/vars" @@ -44,16 +43,3 @@ func PathFromGoSrc() (string, error) { // skip slash return dir[len(parent)+1:], nil } - -func GetParentPackage(dir string) (string, error) { - absDir, err := filepath.Abs(dir) - if err != nil { - return "", err - } - pos := strings.Index(absDir, vars.ProjectName) - if pos < 0 { - return "", fmt.Errorf("error dir:[%s],please make sure that your project is in the %s directory", vars.ProjectName, dir) - } - - return absDir[pos:], nil -}