Compare commits

..

176 Commits

Author SHA1 Message Date
Kevin Wan
6a8638fc85 chore: refine rpc template in goctl (#1129) 2021-10-12 22:13:14 +08:00
Mikael
837a9ffa03 go-zero/core/hash/hash_test.go 增加测试 TestMd5Hex (#1128) 2021-10-12 21:57:07 +08:00
chenquan
d28cfe5f20 Add opts ...grpc.CallOption in grpc client (#1122)
* Add `opts ...grpc.CallOption` in grpc client

* Update format

* Update format

* Add import package

* Update format
2021-10-12 21:52:50 +08:00
chenquan
022c100dc9 Add request method in http log (#1120)
* Add request method in http log

* Update log format
2021-10-12 21:50:30 +08:00
Kevin Wan
426b09c356 update goctl version to 1.2.2 (#1125) 2021-10-12 14:00:35 +08:00
Kevin Wan
40dc21e4cf add cncf landscape (#1123) 2021-10-11 15:31:29 +08:00
Kevin Wan
9b114e3251 test: add more tests (#1119) 2021-10-08 17:31:37 +08:00
Kevin Wan
4c6234f108 add more tests (#1115)
* add more tests

* fix lint errors
2021-10-04 21:08:44 +08:00
Kevin Wan
3cdfcb05f1 add more tests (#1114) 2021-10-04 20:02:25 +08:00
Kevin Wan
9f5bfa0088 add more tests (#1113) 2021-10-04 14:14:25 +08:00
Kevin Wan
2d42c8fa00 test: add more tests (#1112) 2021-10-03 21:30:48 +08:00
Kevin Wan
10e7922597 feat: opentelemetry integration, removed self designed tracing (#1111)
* feat: opentelemetry integration, removed self designed tracing

* feat: support zipkin on opentelemetry integration

* feat: support zipkin on opentelemetry integration, enable it in conf

* style: format code

* fix: support logx without exporter configured

* fix: check return values

* refactor: simplify code

* refactor: simplify opentelemetry integration

* ci: fix staticcheck errors
2021-10-03 20:53:50 +08:00
Kevin Wan
6e34b55ba7 docs: update roadmap (#1110) 2021-10-02 18:16:59 +08:00
Kevin Wan
ed15ca04f4 fix: opentelemetry traceid not correct (#1108) 2021-10-01 22:44:37 +08:00
小小小下
295ec27e1b feat: reflection grpc service (#1107)
* feat: reflection grpc service

* feat: reflection grpc service
2021-10-01 22:02:04 +08:00
Kevin Wan
d1e702e8a3 test: add more tests (#1106)
* style: format code

* test: add more tests

* fix: staticcheck errors
2021-10-01 10:03:56 +08:00
chenquan
d1bfb5ef61 Fix the resources variable not reset after the resource manager is closed (#1105)
* Fix the resource variable not reset after the resource manager is closed

* Format code
2021-09-30 16:55:36 +08:00
Kevin Wan
e43357164c chore: replace redis.NewRedis with redis.New (#1103) 2021-09-29 23:01:10 +08:00
Kevin Wan
cd21c9fa74 chore: mark redis.NewRedis as Deprecated, use redis.New instead. (#1100)
* chore: mark redis.NewRedis as Deprecated

* chore: mark redis.NewRedis as Deprecated
2021-09-29 22:07:05 +08:00
Kevin Wan
cdd2fcbbc9 update grpc package (#1099) 2021-09-29 19:30:06 +08:00
理工男
8d2db09d45 Update Makefile (#1098) 2021-09-29 18:04:08 +08:00
Kevin Wan
65905b914d ci: add reviewdog (#1096) 2021-09-29 13:09:20 +08:00
z-micro
80e3407be1 fix bug: generating dart code error (#1090) 2021-09-28 09:01:27 +08:00
Kevin Wan
657d27213a docs: update roadmap (#1094) 2021-09-26 17:41:19 +08:00
Kevin Wan
8ac18a9422 docs: update roadmap (#1093) 2021-09-26 17:38:49 +08:00
Kevin Wan
d3ae9cfd49 ci: accurate error reporting on lint check (#1089) 2021-09-25 23:25:40 +08:00
Kevin Wan
d7f42161fd update zero-doc links in readme (#1088) 2021-09-24 11:31:00 +08:00
Kevin Wan
e03229cabe docs: change organization from tal-tech to zeromicro in readme (#1087) 2021-09-23 20:24:46 +08:00
Kevin Wan
8403ed16ae ci: add Lint check on commits (#1086)
* ci: add Lint check on commits

* ci: fix Lint script error

* test: fix go vet errors

* test: fix go vet errors, remove gofumpt to check go vet

* test: fix go vet errors, try gofumpt

* test: fix go vet errors, try gofumpt, round 1

* test: fix go vet errors, try gofumpt, round 2

* ci: fix Lint errors
2021-09-23 19:57:05 +08:00
Kevin Wan
d87d203c3b Revert "chore: run unit test with go 1.14 (#1084)" (#1085)
This reverts commit 3ae6a882a7.
2021-09-23 15:34:10 +08:00
Kevin Wan
3ae6a882a7 chore: run unit test with go 1.14 (#1084) 2021-09-23 15:30:40 +08:00
Amor
41c980f00c update goctl api (#1052)
* update goctl api

* add LoadTemplate

* update new api template

* update
2021-09-23 14:31:11 +08:00
shenbaise9527
f34d81ca2c chore: when run goctl-rpc, the order of proto message aliases should be (#1078)
fixed

Co-authored-by: steven <steven.zhou@1quant.com>
2021-09-23 14:24:21 +08:00
NevS
004ee488a6 fix AtomicError panic when Set nil (#1049) (#1050) 2021-09-23 14:23:02 +08:00
Kevin Wan
2e12cd2c99 coding style (#1083) 2021-09-23 11:19:57 +08:00
neosu
2695c30886 we can use otel.ErrorHandlerFunc instead of custom struct when we update OpenTelemetry to 1.0.0 (#1081) 2021-09-23 10:42:45 +08:00
Kevin Wan
c74fb988e0 update go.mod (#1079) 2021-09-22 21:54:54 +08:00
anqiansong
e8a340c1c0 Create a symbol link file named protoc-gen-goctl from goctl (#1076) 2021-09-21 23:13:31 +08:00
Kevin Wan
06e114e5a3 update OpenTelemetry to 1.0.0 (#1075) 2021-09-21 23:11:36 +08:00
Kevin Wan
74ad681a66 update issue templates (#1074) 2021-09-21 14:13:28 +08:00
Kevin Wan
e7bbc09093 Update issue templates 2021-09-21 13:58:21 +08:00
Kevin Wan
1eb1450c43 downgrade golang-jwt to support go 1.14 (#1073) 2021-09-21 13:42:45 +08:00
anqiansong
9a724fe907 Add MustTempDir (#1069) 2021-09-21 10:13:43 +08:00
Kevin Wan
30e49f2939 fix jwt security issue by using golang-jwt package (#1066) 2021-09-19 22:33:35 +08:00
Kevin Wan
a5407479a6 upgrade grpc version & replace github.com/golang/protobuf/protoc-gen-go with google.golang.org/protobuf (#1065) 2021-09-19 21:37:55 +08:00
anqiansong
7fb5bab26b fix #1058 (#1064) 2021-09-19 08:53:41 +08:00
Kevin Wan
27249e021f add repo moving notice (#1062) 2021-09-18 22:24:45 +08:00
Kevin Wan
d809795fec add go-zero users (#1061) 2021-09-18 22:01:15 +08:00
Kevin Wan
c9db9588b7 chore: fix comment issues (#1056) 2021-09-17 17:11:03 +08:00
Kevin Wan
872c50b71a chore: make comment accurate (#1055) 2021-09-17 11:11:42 +08:00
Kevin Wan
7c83155e4f mention cncf landscape (#1054) 2021-09-17 10:44:42 +08:00
Kevin Wan
358d86b8ae add go-zero users (#1051) 2021-09-16 15:06:42 +08:00
Kevin Wan
f4bb9f5635 fix test error on ubuntu (#1048) 2021-09-15 13:31:20 +08:00
Jerry Liang
5c6a3132eb fix typo parse.go error message (#1041) 2021-09-14 10:26:44 +08:00
Kevin Wan
2bd95aa007 update goctl version to 1.2.1 (#1042) 2021-09-14 08:32:56 +08:00
Kevin Wan
e8376936d5 remove goctl config command (#1035) 2021-09-13 14:04:06 +08:00
Kevin Wan
71c0288023 fix symlink issue on windows for goctl (#1034)
* fix symlink issue on windows for goctl

* move readlink into separate file
2021-09-13 11:49:07 +08:00
Kevin Wan
9e2f07a842 update k8s.io/client-go etc to use go 1.15 (#1031) 2021-09-13 10:45:34 +08:00
Kevin Wan
24fd34413f fix golint issues (#1027) 2021-09-12 16:15:42 +08:00
Kevin Wan
3f47251892 fix proc.Done not found in windows (#1026) 2021-09-12 15:41:33 +08:00
Kevin Wan
0b6bc69afa reorg imports, format code (#1024) 2021-09-11 21:28:47 +08:00
anqiansong
5b9bdc8d02 Merge pull request #1023 from anqiansong/1014-rollback
revert changes
2021-09-11 20:58:42 +08:00
anqiansong
ded22e296e revert changes 2021-09-11 20:57:58 +08:00
anqiansong
f0ed2370a3 fix #1014 (#1018)
* fix #1014

* remove unused code

* * optimize generate pb.go on Windows
* format code
* optimize console.go

* version rollback

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2021-09-11 12:48:32 +08:00
Kevin Wan
6bf6cfdd01 add go-zero users (#1022) 2021-09-10 22:03:24 +08:00
Kevin Wan
5cc9eb0de4 rename sharedcalls to singleflight (#1017) 2021-09-09 18:06:27 +08:00
Kevin Wan
f070d447ef refactor for better error reporting on sql error (#1016)
* refactor for better error reporting on sql error

* fix errors

* add docs
2021-09-09 15:43:33 +08:00
Kevin Wan
f6d9e19ecb expose sql.DB to let orm operate on it (#1015)
* expose sql.DB to let orm operate on it

* add missing RawDB methods

* add NewSqlConnFromDB for cooperate with dtm
2021-09-09 11:40:28 +08:00
Kevin Wan
56807aabf6 fix golint issues, update codecov settings. (#1011)
* update codecov settings

* fix golint issues
2021-09-07 22:33:02 +08:00
Kevin Wan
861dcf2f36 update codecov settings (#1010) 2021-09-07 18:09:53 +08:00
Kevin Wan
c837dc21bb refactoring tracing interceptors. (#1009)
* refactor tracing interceptors

* add stream tracing interceptor
2021-09-07 17:58:22 +08:00
Kevin Wan
96a35ecf1a fix #1006 (#1008)
* use sdktrace instead of trace for opentelemetry to avoid conflicts

* disable opentelemetry for right now

* use os.ModeSymlink instead of fs.ModeSymlink for backward compatibility
2021-09-07 15:38:40 +08:00
Kevin Wan
bdec5f2349 use sdktrace instead of trace for opentelemetry to avoid conflicts (#1005)
* use sdktrace instead of trace for opentelemetry to avoid conflicts

* disable opentelemetry for right now
2021-09-07 12:02:41 +08:00
shenbaise9527
bc92b57bdb api文件中使用group时生成的handler和logic的包名应该为group的名字 (#545)
* api文件中使用group时生成的handler和logic的包名应该为group的名字

* Update genhandlers.go

fix errors.

Co-authored-by: Kevin Wan <wanjunfeng@gmail.com>
2021-09-07 11:03:04 +08:00
neosu
d8905b9e9e add api template file (#1003) 2021-09-07 10:16:10 +08:00
neosu
dec6309c55 add opentelemetry test (#1002) 2021-09-07 09:26:45 +08:00
Kevin Wan
10805577f5 reorg imports, format code (#1000) 2021-09-06 14:56:46 +08:00
SunJun
a4d8286e36 开启otel后,tracelog自动获取otle的traceId和spanId (#946)
* 开启otel后,tracelog自动获取otle的traceId和spanId

* 去除opentelemetry判断

* 通过 IsRecording 判断span是否活跃
2021-09-06 14:31:54 +08:00
anqiansong
84d2b64e7c optimize unit test (#999) 2021-09-06 14:20:21 +08:00
Kevin Wan
6476da4a18 rest log with context (#998) 2021-09-05 22:58:42 +08:00
Kevin Wan
79eab0ea2f refactor to shorter config name (#997) 2021-09-05 22:43:15 +08:00
Allen Liu
3b683fd498 feat: change logger to traceLogger for getting traceId when recovering (#374) 2021-09-05 22:40:10 +08:00
toven tang
d179b342b2 修复使用 postgres 数据库时,位置参数重复,导致参数与值不对应的问题。 (#960)
* 修复使用 postgres 数据库时,位置参数重复,导致参数与值不对应的问题。

* 修复使用 postgres 数据库时,位置参数重复,导致参数与值不对应的问题。

Co-authored-by: toven <toven@advan.onaliyun.com>
2021-09-05 22:27:59 +08:00
Kevin Wan
58874779e7 move opentelemetry into trace package, and refactoring (#996)
* move opentelemetry into trace package, and refactoring

* rename rewritten package names
2021-09-05 22:18:35 +08:00
anqiansong
8829c31c0d Feature goctl error wrap (#995)
* Add `Wrap` in file errorx.go

* Wrap error with `GoctlError`

* format code

* Refactor package `env` to `version`

* Refactor package `IsVersionGatherThan`

* fix typo

Co-authored-by: anqiansong <anqiansong@bytedance.com>
2021-09-05 21:57:44 +08:00
Kevin Wan
b42f3fa047 disable codecov github checks (#993) 2021-09-04 13:32:23 +08:00
Kevin Wan
9bdadf2381 fix golint issues (#992) 2021-09-04 12:16:30 +08:00
Kevin Wan
20f665ede8 implement k8s service discovery (#988)
* implement k8s service discovery

* simplify code

* use default namespace if not provided

* disable codecov bot comment

* ignore adhoc dir

* simplify building target in NewClient

* reformat code

* Fix filepath (#990)

* format code, and reorg imports (#991)

* add more unit test

Co-authored-by: anqiansong <anqiansong@gmail.com>
2021-09-04 10:27:08 +08:00
Kevin Wan
0325d8e92d format code, and reorg imports (#991) 2021-09-04 10:08:49 +08:00
anqiansong
2125977281 Fix filepath (#990) 2021-09-04 08:15:22 +08:00
Vee Zhang
c26c187e11 remote handler blank line when .HasRequest is false (#986) 2021-09-03 07:25:04 +08:00
Kevin Wan
4ef1859f0b use codecov action v1 (#985) 2021-09-02 19:07:30 +08:00
Kevin Wan
407a6cbf9c format coding style (#983) 2021-09-01 19:52:56 +08:00
Vee Zhang
76fc1ef460 httpx.Error response without body (#982)
* httpx.Error support response without body

* fix doc
2021-09-01 19:33:33 +08:00
miaogaolin
423955c55f format code (#979) 2021-08-31 17:15:36 +08:00
Kevin Wan
db95b3f0e3 configurable for load and stat statistics logs (#980) 2021-08-31 17:14:31 +08:00
Kevin Wan
4bee60eb7f add go-zero users (#978) 2021-08-31 16:08:33 +08:00
Kevin Wan
7618139dad refactor (#977) 2021-08-31 12:04:09 +08:00
Kevin Wan
6fd08027ff update go version to 1.14 for github workflow (#976) 2021-08-31 11:46:22 +08:00
chenquan
b9e268aae8 Update Codecov action (#974) 2021-08-31 11:25:58 +08:00
Kevin Wan
4c1bb1148b fix #971 (#972) 2021-08-29 11:38:34 +08:00
Kevin Wan
50a6bbe6b9 format coding style (#970) 2021-08-28 12:11:11 +08:00
chenquan
dfb3cb510a Fix context error in grpc (#962)
* Fix context error in rpc

* Add a test case

* Optimize judgment conditions

* Add customized breaker errors for the client and server

* Update method signature

* Delete customized breaker errors

* Delete the wrong test case
2021-08-28 12:07:14 +08:00
Kevin Wan
519db812b4 format coding style (#969) 2021-08-27 23:09:47 +08:00
anqiansong
3203f8e06b Fix issues (#965) 2021-08-27 15:05:54 +08:00
chenquan
b71ac2042a Add a test case for database code generation tool (#961) 2021-08-27 06:50:09 +08:00
Kevin Wan
d0f9e57022 fix #957 (#959) 2021-08-26 16:47:28 +08:00
SunJun
aa68210cde 修复stream拦截器tracer名问题 (#944) 2021-08-24 15:17:53 +08:00
zhoushuguang
280e837c9e rest otel support (#943) 2021-08-24 10:04:12 +08:00
Kevin Wan
f669e1226c fix #556 (#938) 2021-08-22 23:36:35 +08:00
Kevin Wan
cd15c19250 fix lint errors (#937) 2021-08-22 10:24:32 +08:00
neosu
5b35fa17de add the opentelemetry tracing (#908)
* add the opentelemetry tracing

* fix the error sampler config

* 添加stream的链路跟踪

* fix the error field name
2021-08-22 10:03:56 +08:00
Kevin Wan
9672298fa8 make sure setting code happen before callback in rest (#936) 2021-08-22 09:27:20 +08:00
Kevin Wan
bf3ce16823 fix #820 (#934) 2021-08-19 22:48:21 +08:00
anqiansong
189721da16 Fix issues (#931)
* fix #929

* fix #925

* add test case

* update model README

* fix #929

* fix #929

* fix #929

* refactor dir

* Adding todo comments

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-08-19 22:47:45 +08:00
Kevin Wan
a523ab1f93 update slack invite url (#933) 2021-08-19 22:25:41 +08:00
Kevin Wan
7ea8b636d9 add go-zero users (#928) 2021-08-19 10:39:47 +08:00
anqiansong
b2fea65faa Optimize model naming (#910)
* add unit test

* fix #907

* format code

* format code

* format code

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-08-18 17:09:34 +08:00
anqiansong
a1fe8bf6cd fix missing updateMethodTemplateFile (#924) 2021-08-18 12:23:43 +08:00
Kevin Wan
67ee9e4391 add unit test (#921) 2021-08-17 17:09:26 +08:00
Kevin Wan
9c1ee50497 refactor (#920) 2021-08-17 10:24:12 +08:00
chenquan
7c842f22d0 Add traceId to the response headers (#919)
* Add traceId to the request headers

* Add test cases

* Update refactor code
2021-08-17 10:12:08 +08:00
anqiansong
14ec29991c fix #915 (#917) 2021-08-16 15:07:11 +08:00
Kevin Wan
c7f5aad83a add stringx.FirstN with ellipsis (#916) 2021-08-16 12:08:33 +08:00
lucaq
e77747cff8 redis.go,type StringCmd = red.StringCmd (#790)
* Add Sinter,Sinterstore; Modify TestRedis_Set

* type StringCmd

* redis.go,type StringCmd = red.StringCmd

Co-authored-by: lucq <lucq@toopsoon.com>
2021-08-16 09:52:16 +08:00
Kevin Wan
f2612db4b1 add stringx.FirstN (#914) 2021-08-15 23:02:48 +08:00
Kevin Wan
a21ff71373 fix #889 (#912) 2021-08-15 15:33:45 +08:00
Kevin Wan
fc04ad7854 export pathvar for user-defined routers (#911)
* refactor

* export pathvar for user-defined routers
2021-08-14 22:57:28 +08:00
Kevin Wan
fbf2eebc42 add Errorv/Infov/Slowv (#909) 2021-08-13 18:28:39 +08:00
anqiansong
dc43430812 optimize grpc generation env check (#900)
* optimize grpc generation env check

* optimize grpc generation env check

* format code

* fix postgresql data type convert error

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-08-13 11:47:42 +08:00
Kevin Wan
c6642bc2e6 add workflow for closing inactive issues (#906)
* add workflow for closing inactive issues

* add workflow for closing inactive issues, add newline
2021-08-12 15:28:29 +08:00
Phibe
bdca24dd3b Update readme for better description. (#904) 2021-08-12 14:59:17 +08:00
Kevin Wan
00c5734021 format coding style (#905) 2021-08-12 14:58:37 +08:00
lovelly
33f87cf1f0 带下划线的项目,配置文件名字错误。 (#733) 2021-08-12 14:55:39 +08:00
Kevin Wan
69935c1ba3 refactor goctl, fix golint issues (#903) 2021-08-11 18:08:01 +08:00
Kevin Wan
1fb356f328 refactor goctl (#902) 2021-08-11 18:04:42 +08:00
市民233
0b0406f41a fix: 解决golint 部分警告 (#897)
* feat:  解决goreportcard的警报
ps: warning: if block ends with a return statement, so drop this else and outdent its block (golint)

* feat: 优化golint警告,将processFieldPrimitiveWithJsonNumber 改成 processFieldPrimitiveWithJSONNumber
unmarshaler.go:248:23: method processFieldPrimitiveWithJsonNumber should be processFieldPrimitiveWithJSONNumber

* feat: 添加 WithCanonicalKeyFunc 注释

* feat: 添加DisableStat的注释

* feat: 添加 RegisterGoctlHome 注释

* feat: 添加 PostgreSqlJoin 注释

* feat: 解决goline警告should not use basic type string as key in context.WithValue问题

* feat: 解决golint警告信息: should not use basic type string as key in context.WithValue

* fix: 定义自定义字段类型,导致go test fail 问题

* update: 恢复原有测试用例

* fix golint warning
2021-08-11 17:57:20 +08:00
_ksco
cc264dcf55 refactor (#878)
* refactor(tools ): refactor cod

Improve code readability and performance

* fix(tools ): fix len bug

Co-authored-by: ksco <hyang@33.cn>
2021-08-11 17:33:18 +08:00
Kevin Wan
e024aebb66 fix golint issues (#899) 2021-08-11 11:38:55 +08:00
Phibe
f204729482 remove unnecessary chars. (#898) 2021-08-11 10:51:28 +08:00
Phibe
d20cf56a69 simplify type definition in readme (#896)
* better text rendering

* simplify type definition in readme
2021-08-10 18:02:11 +08:00
Kevin Wan
54d57c7d4b refactor rest code (#895) 2021-08-10 17:59:33 +08:00
voidint
28a7c9d38f fix http header binding failure bug #885 (#887) 2021-08-10 17:38:03 +08:00
Kevin Wan
872e75e10d add logx.DisableStat() to disable stat logs (#893)
* add logx.DisableStat() to disable stat logs

* refactor logx code
2021-08-10 16:55:38 +08:00
Phibe
af1730079e better text rendering (#892) 2021-08-10 15:13:09 +08:00
Kevin Wan
04521e2d24 format code (#888) 2021-08-09 23:03:08 +08:00
Kevin Wan
02adcccbf4 format code (#884) 2021-08-09 18:11:08 +08:00
anqiansong
a74aaf1823 optimize mongo generation without cache (fix #881) (#882)
* fix #881

* fix #881

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-08-09 11:33:01 +08:00
Kevin Wan
1eb2089c69 add go-zero users (#883) 2021-08-09 08:37:43 +08:00
Kevin Wan
f7f3730e1a update goctl version to 1.1.10 (#874) 2021-08-04 19:29:40 +08:00
Kevin Wan
0ee7654407 fix #792 (#873) 2021-08-04 18:45:05 +08:00
neosu
16cc990fdd fix context missing (#872)
Co-authored-by: suzhenpeng <suzhenpeng@ecoplants.tech>
2021-08-04 17:46:51 +08:00
neosu
00061c2e5b add goctl rpc template home flag (#871)
Co-authored-by: suzhenpeng <suzhenpeng@ecoplants.tech>
2021-08-04 15:54:43 +08:00
Kevin Wan
6793f7a1de fix bug that proc.SetTimeToForceQuit not working in windows (#869) 2021-08-04 11:31:33 +08:00
anqiansong
c8428a7f65 fix issue #861 (#862)
* fix issue #861

* fix issue #861

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-08-01 23:00:57 +08:00
toutou_o
a5e1d0d0dc add correct example for pg's url (#857) 2021-07-30 13:58:44 +08:00
anqiansong
8270c7deed optimize typo (#855) 2021-07-29 21:53:16 +08:00
anqiansong
9f4a882a1b fix issue #831 (#850)
* fix issue #831

* fix typo

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-28 16:32:15 +08:00
anqiansong
cb7b7cb72e fix issue #836 (#849)
Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-27 22:13:17 +08:00
Kevin Wan
603c93aa4a upgrade grpc package (#845) 2021-07-24 22:29:02 +08:00
masonchen2014
cb8d9d413a simplify timeoutinterceptor (#840)
Co-authored-by: chenmusheng <chenmusheng@laoyuegou.com>
2021-07-24 21:51:46 +08:00
Kevin Wan
ff7443c6a7 fix #796 (#844) 2021-07-24 12:58:14 +08:00
fangjianwei
b812e74d6f Fixed http listener error. (#843) 2021-07-24 12:57:56 +08:00
anqiansong
089cdaa75f Feature model postgresql (#842)
* Support postgresql generate

* Update template Var

* Support to generate postgresql model

* Support to generate postgresql model

* Update template

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-23 11:45:15 +08:00
fangjianwei
476026e393 Added database prefix of cache key. (#835) 2021-07-22 11:29:09 +08:00
Kevin Wan
75952308f9 remove faq for old versions (#828) 2021-07-19 22:54:21 +08:00
Kevin Wan
df0550d6dc add go-zero users, update faq (#827) 2021-07-19 17:13:33 +08:00
neosu
e481b63b21 Fix the error stream method name (#826) 2021-07-18 22:05:28 +08:00
Kevin Wan
e47079f0f4 go format with extra rules (#821) 2021-07-17 20:51:23 +08:00
anqiansong
9b2a279948 Fix issues: #725, #740 (#813)
* Fix issues: #725, #740

* Update filed sort

Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-16 22:55:39 +08:00
anqiansong
db87fd3239 To generate grpc stream, fix issue #616 (#815)
Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-16 22:54:07 +08:00
aaffo
598fda0c97 optimized (#819) 2021-07-15 23:50:44 +08:00
Chen Quan
b0e335e7b0 Fix rpc generator bug (#799)
* Fix rpc自动生成generate bug

* Delete mock
2021-07-10 13:12:52 +08:00
anqiansong
efdf475da4 Add --go_opt flag to adapt to the version after 1.4.0 of protoc-gen-go (#767)
Co-authored-by: anqiansong <anqiansong@xiaoheiban.cn>
2021-07-08 10:11:11 +08:00
Chen Quan
22a1315136 [WIP]Add parse headers info (#805)
* Add parse headers info

* Update parse headers info
2021-07-07 23:20:09 +08:00
Kevin Wan
5b22823018 fix bug that empty query in transaction (#801) 2021-06-29 23:18:32 +08:00
274 changed files with 7253 additions and 2653 deletions

View File

@@ -1,4 +1,3 @@
comment: false
ignore:
- "doc"
- "example"
- "tools"
- "tools"

40
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,40 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior, if applicable:
1. The code is
```go
```
2. The error is
```
```
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Environments (please complete the following information):**
- OS: [e.g. Linux]
- go-zero version [e.g. 1.2.1]
- goctl version [e.g. 1.2.1, optional]
**More description**
Add any other context about the problem here.

View File

@@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

10
.github/ISSUE_TEMPLATE/question.md vendored Normal file
View File

@@ -0,0 +1,10 @@
---
name: Question
about: Ask a question on using go-zero or goctl
title: ''
labels: ''
assignees: ''
---

View File

@@ -15,7 +15,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ^1.13
go-version: ^1.14
id: go
- name: Check out code into the Go module directory
@@ -25,10 +25,14 @@ jobs:
run: |
go get -v -t -d ./...
- name: Lint
run: |
go vet -stdmethods=false $(go list ./...)
go install mvdan.cc/gofumpt@latest
test -z "$(gofumpt -s -l -extra .)" || echo "Please run 'gofumpt -l -w -extra .'"
- name: Test
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
- name: Codecov
uses: codecov/codecov-action@v1.0.6
with:
token: ${{secrets.CODECOV_TOKEN}}
uses: codecov/codecov-action@v2

19
.github/workflows/issues.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: Close inactive issues
on:
schedule:
- cron: "30 1 * * *"
jobs:
close-issues:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v3
with:
days-before-issue-stale: 30
days-before-issue-close: 14
stale-issue-label: "stale"
stale-issue-message: "This issue is stale because it has been open for 30 days with no activity."
close-issue-message: "This issue was closed because it has been inactive for 14 days since being marked as stale."
days-before-pr-stale: -1
days-before-pr-close: -1
repo-token: ${{ secrets.GITHUB_TOKEN }}

19
.github/workflows/reviewdog.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: reviewdog
on: [pull_request]
jobs:
staticcheck:
name: runner / staticcheck
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: reviewdog/action-staticcheck@v1
with:
github_token: ${{ secrets.github_token }}
# Change reviewdog reporter if you need [github-pr-check,github-check,github-pr-review].
reporter: github-pr-review
# Report all results.
filter_mode: nofilter
# Exit with 1 when it find at least one finding.
fail_on_error: true
# Set staticcheck flags
staticcheck_flags: -checks=inherit,-SA1019,-SA1029,-SA5008

1
.gitignore vendored
View File

@@ -10,6 +10,7 @@
!*/
!api
# ignore
.idea
**/.DS_Store
**/logs

View File

@@ -90,7 +90,7 @@ After that, run these local verifications before submitting pull request to pred
fail of continuous integration.
* Format the code with `gofmt`
* Run the test with data race enabled `go test -race ./`
* Run the test with data race enabled `go test -race ./...`
## Code Review

View File

@@ -5,17 +5,18 @@ Community and contributor involvement is vital for successfully implementing all
We hope that the items listed below will inspire further engagement from the community to keep go-zero progressing and shipping exciting and valuable features.
## 2021 Q2
- Support TLS in redis connections
- Support service discovery through K8S watch api
- Log full sql statements for easier sql problem solving
- [x] Support service discovery through K8S client api
- [x] Log full sql statements for easier sql problem solving
## 2021 Q3
- Support `goctl mock` command to start a mocking server with given `.api` file
- Adapt builtin tracing mechanism to opentracing solutions
- Support `goctl model pg` to support PostgreSQL code generation
- [x] Support `goctl model pg` to support PostgreSQL code generation
- [ ] Support `goctl mock` command to start a mocking server with given `.api` file
- [ ] Adapt builtin tracing mechanism to opentracing solutions
## 2021 Q4
- Support `goctl doctor` command to report potential issues for given service
- Support `context` in redis related methods for timeout and tracing
- Support `context` in sql related methods for timeout and tracing
- Support `context` in mongodb related methods for timeout and tracing
- [ ] Add `httpx.Client` with governance, like circuit breaker etc.
- [ ] Support `goctl doctor` command to report potential issues for given service
- [ ] Support `context` in redis related methods for timeout and tracing
- [ ] Support `context` in sql related methods for timeout and tracing
- [ ] Support `context` in mongodb related methods for timeout and tracing
- [ ] Support TLS in redis connections

View File

@@ -34,7 +34,7 @@ type (
expire time.Duration
timingWheel *TimingWheel
lruCache lru
barrier syncx.SharedCalls
barrier syncx.SingleFlight
unstableExpiry mathx.Unstable
stats *cacheStat
}
@@ -46,7 +46,7 @@ func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
data: make(map[string]interface{}),
expire: expire,
lruCache: emptyLruCache,
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
}

View File

@@ -106,9 +106,7 @@ func (s *Set) KeysInt() []int {
var keys []int
for key := range s.data {
if intKey, ok := key.(int); !ok {
continue
} else {
if intKey, ok := key.(int); ok {
keys = append(keys, intKey)
}
}
@@ -121,9 +119,7 @@ func (s *Set) KeysInt64() []int64 {
var keys []int64
for key := range s.data {
if intKey, ok := key.(int64); !ok {
continue
} else {
if intKey, ok := key.(int64); ok {
keys = append(keys, intKey)
}
}
@@ -136,9 +132,7 @@ func (s *Set) KeysUint() []uint {
var keys []uint
for key := range s.data {
if intKey, ok := key.(uint); !ok {
continue
} else {
if intKey, ok := key.(uint); ok {
keys = append(keys, intKey)
}
}
@@ -151,9 +145,7 @@ func (s *Set) KeysUint64() []uint64 {
var keys []uint64
for key := range s.data {
if intKey, ok := key.(uint64); !ok {
continue
} else {
if intKey, ok := key.(uint64); ok {
keys = append(keys, intKey)
}
}
@@ -166,9 +158,7 @@ func (s *Set) KeysStr() []string {
var keys []string
for key := range s.data {
if strKey, ok := key.(string); !ok {
continue
} else {
if strKey, ok := key.(string); ok {
keys = append(keys, strKey)
}
}

View File

@@ -47,9 +47,11 @@ func TestUnmarshalContextWithMissing(t *testing.T) {
Name string `ctx:"name"`
Age int `ctx:"age"`
}
type name string
const PersonNameKey name = "name"
ctx := context.Background()
ctx = context.WithValue(ctx, "name", "kevin")
ctx = context.WithValue(ctx, PersonNameKey, "kevin")
var person Person
err := For(ctx, &person)

View File

@@ -9,7 +9,9 @@ import (
)
func TestContextCancel(t *testing.T) {
c := context.WithValue(context.Background(), "key", "value")
type key string
var nameKey key = "name"
c := context.WithValue(context.Background(), nameKey, "value")
c1, cancel := context.WithCancel(c)
o := ValueOnlyFrom(c1)
c2, cancel2 := context.WithCancel(o)

View File

@@ -6,9 +6,10 @@ package internal
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
connectivity "google.golang.org/grpc/connectivity"
reflect "reflect"
)
// MocketcdConn is a mock of etcdConn interface

View File

@@ -5,8 +5,9 @@
package internal
import (
gomock "github.com/golang/mock/gomock"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
)
// MockUpdateListener is a mock of UpdateListener interface

View File

@@ -9,7 +9,9 @@ type AtomicError struct {
// Set sets the error.
func (ae *AtomicError) Set(err error) {
ae.err.Store(err)
if err != nil {
ae.err.Store(err)
}
}
// Load returns the error.

View File

@@ -17,6 +17,15 @@ func TestAtomicError(t *testing.T) {
assert.Equal(t, errDummy, err.Load())
}
func TestAtomicErrorSetNil(t *testing.T) {
var (
errNil error
err AtomicError
)
err.Set(errNil)
assert.Equal(t, errNil, err.Load())
}
func TestAtomicErrorNil(t *testing.T) {
var err AtomicError
assert.Nil(t, err.Load())

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package fs

View File

@@ -1,3 +1,4 @@
//go:build linux || darwin
// +build linux darwin
package fs

View File

@@ -395,16 +395,16 @@ func assetEqual(t *testing.T, except, data interface{}) {
func TestStream_AnyMach(t *testing.T) {
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 4 == item.(int)
return item.(int) == 4
}))
assetEqual(t, false, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 0 == item.(int)
return item.(int) == 0
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
return item.(int) == 2
}))
assetEqual(t, true, Just(1, 2, 3).AnyMach(func(item interface{}) bool {
return 2 == item.(int)
return item.(int) == 2
}))
}

View File

@@ -2,6 +2,9 @@ package fx
import (
"context"
"fmt"
"runtime/debug"
"strings"
"time"
)
@@ -30,7 +33,8 @@ func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) err
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
// attach call stack to avoid missing in different goroutine
panicChan <- fmt.Sprintf("%+v\n\n%s", p, strings.TrimSpace(string(debug.Stack())))
}
}()
done <- fn()

View File

@@ -20,6 +20,11 @@ func TestMd5(t *testing.T) {
assert.Equal(t, md5Digest, actual)
}
func TestMd5Hex(t *testing.T) {
actual := Md5Hex([]byte(text))
assert.Equal(t, md5Digest, actual)
}
func BenchmarkHashFnv(b *testing.B) {
for i := 0; i < b.N; i++ {
h := fnv.New32()

View File

@@ -24,7 +24,7 @@ func TestTokenLimit_Rescue(t *testing.T) {
rate = 5
burst = 10
)
l := NewTokenLimiter(rate, burst, redis.NewRedis(s.Addr(), redis.NodeType), "tokenlimit")
l := NewTokenLimiter(rate, burst, redis.New(s.Addr()), "tokenlimit")
s.Close()
var allowed int

View File

@@ -31,6 +31,8 @@ var (
// default to be enabled
enabled = syncx.ForAtomicBool(true)
// default to be enabled
logEnabled = syncx.ForAtomicBool(true)
// make it a variable for unit test
systemOverloadChecker = func(cpuThreshold int64) bool {
return stat.CpuUsage() >= cpuThreshold
@@ -80,6 +82,11 @@ func Disable() {
enabled.Set(false)
}
// DisableLog disables the stat logs for load shedding.
func DisableLog() {
logEnabled.Set(false)
}
// NewAdaptiveShedder returns an adaptive shedder.
// opts can be used to customize the Shedder.
func NewAdaptiveShedder(opts ...ShedderOption) Shedder {

View File

@@ -25,6 +25,7 @@ func init() {
}
func TestAdaptiveShedder(t *testing.T) {
DisableLog()
shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
var wg sync.WaitGroup
var drop int64

View File

@@ -48,6 +48,25 @@ func (s *SheddingStat) IncrementDrop() {
atomic.AddInt64(&s.drop, 1)
}
func (s *SheddingStat) loop(c <-chan time.Time) {
for range c {
st := s.reset()
if !logEnabled.True() {
continue
}
c := stat.CpuUsage()
if st.Drop == 0 {
logx.Statf("(%s) shedding_stat [1m], cpu: %d, total: %d, pass: %d, drop: %d",
s.name, c, st.Total, st.Pass, st.Drop)
} else {
logx.Statf("(%s) shedding_stat_drop [1m], cpu: %d, total: %d, pass: %d, drop: %d",
s.name, c, st.Total, st.Pass, st.Drop)
}
}
}
func (s *SheddingStat) reset() snapshot {
return snapshot{
Total: atomic.SwapInt64(&s.total, 0),
@@ -59,15 +78,6 @@ func (s *SheddingStat) reset() snapshot {
func (s *SheddingStat) run() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
c := stat.CpuUsage()
st := s.reset()
if st.Drop == 0 {
logx.Statf("(%s) shedding_stat [1m], cpu: %d, total: %d, pass: %d, drop: %d",
s.name, c, st.Total, st.Pass, st.Drop)
} else {
logx.Statf("(%s) shedding_stat_drop [1m], cpu: %d, total: %d, pass: %d, drop: %d",
s.name, c, st.Total, st.Pass, st.Drop)
}
}
s.loop(ticker.C)
}

View File

@@ -2,6 +2,7 @@ package load
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@@ -22,3 +23,32 @@ func TestSheddingStat(t *testing.T) {
assert.Equal(t, int64(5), result.Pass)
assert.Equal(t, int64(7), result.Drop)
}
func TestLoopTrue(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
logEnabled.Set(true)
st.loop(ch)
}
func TestLoopTrueAndDrop(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
st.IncrementDrop()
logEnabled.Set(true)
st.loop(ch)
}
func TestLoopFalseAndDrop(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
st.IncrementDrop()
logEnabled.Set(false)
st.loop(ch)
}

View File

@@ -20,49 +20,67 @@ func WithDuration(d time.Duration) Logger {
}
func (l *durationLogger) Error(v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, formatWithCaller(fmt.Sprint(v...), durationCallerDepth))
}
}
func (l *durationLogger) Errorf(format string, v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, formatWithCaller(fmt.Sprintf(format, v...), durationCallerDepth))
}
}
func (l *durationLogger) Errorv(v interface{}) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, v)
}
}
func (l *durationLogger) Info(v ...interface{}) {
if shouldLog(InfoLevel) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, fmt.Sprint(v...))
}
}
func (l *durationLogger) Infof(format string, v ...interface{}) {
if shouldLog(InfoLevel) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, fmt.Sprintf(format, v...))
}
}
func (l *durationLogger) Infov(v interface{}) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, v)
}
}
func (l *durationLogger) Slow(v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, fmt.Sprint(v...))
}
}
func (l *durationLogger) Slowf(format string, v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, fmt.Sprintf(format, v...))
}
}
func (l *durationLogger) Slowv(v interface{}) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, v)
}
}
func (l *durationLogger) WithDuration(duration time.Duration) Logger {
l.Duration = timex.ReprOfDuration(duration)
return l
}
func (l *durationLogger) write(writer io.Writer, level, content string) {
func (l *durationLogger) write(writer io.Writer, level string, val interface{}) {
l.Timestamp = getTimestamp()
l.Level = level
l.Content = content
outputJson(writer, logEntry(*l))
l.Content = val
outputJson(writer, l)
}

View File

@@ -65,12 +65,14 @@ var (
timeFormat = "2006-01-02T15:04:05.000Z07"
writeConsole bool
logLevel uint32
infoLog io.WriteCloser
errorLog io.WriteCloser
severeLog io.WriteCloser
slowLog io.WriteCloser
statLog io.WriteCloser
stackLog io.Writer
// use uint32 for atomic operations
disableStat uint32
infoLog io.WriteCloser
errorLog io.WriteCloser
severeLog io.WriteCloser
slowLog io.WriteCloser
statLog io.WriteCloser
stackLog io.Writer
once sync.Once
initialized uint32
@@ -79,10 +81,10 @@ var (
type (
logEntry struct {
Timestamp string `json:"@timestamp"`
Level string `json:"level"`
Duration string `json:"duration,omitempty"`
Content string `json:"content"`
Timestamp string `json:"@timestamp"`
Level string `json:"level"`
Duration string `json:"duration,omitempty"`
Content interface{} `json:"content"`
}
logOptions struct {
@@ -98,10 +100,13 @@ type (
Logger interface {
Error(...interface{})
Errorf(string, ...interface{})
Errorv(interface{})
Info(...interface{})
Infof(string, ...interface{})
Infov(interface{})
Slow(...interface{})
Slowf(string, ...interface{})
Slowv(interface{})
WithDuration(time.Duration) Logger
}
)
@@ -133,7 +138,7 @@ func SetUp(c LogConf) error {
// Alert alerts v in alert level, and the message is written to error log.
func Alert(v string) {
output(errorLog, levelAlert, v)
outputText(errorLog, levelAlert, v)
}
// Close closes the logging.
@@ -195,24 +200,29 @@ func Disable() {
})
}
// DisableStat disables the stat logs.
func DisableStat() {
atomic.StoreUint32(&disableStat, 1)
}
// Error writes v into error log.
func Error(v ...interface{}) {
ErrorCaller(1, v...)
}
// Errorf writes v with format into error log.
func Errorf(format string, v ...interface{}) {
ErrorCallerf(1, format, v...)
}
// ErrorCaller writes v with context into error log.
func ErrorCaller(callDepth int, v ...interface{}) {
errorSync(fmt.Sprint(v...), callDepth+callerInnerDepth)
errorTextSync(fmt.Sprint(v...), callDepth+callerInnerDepth)
}
// ErrorCallerf writes v with context in format into error log.
func ErrorCallerf(callDepth int, format string, v ...interface{}) {
errorSync(fmt.Sprintf(format, v...), callDepth+callerInnerDepth)
errorTextSync(fmt.Sprintf(format, v...), callDepth+callerInnerDepth)
}
// Errorf writes v with format into error log.
func Errorf(format string, v ...interface{}) {
ErrorCallerf(1, format, v...)
}
// ErrorStack writes v along with call stack into error log.
@@ -227,14 +237,25 @@ func ErrorStackf(format string, v ...interface{}) {
stackSync(fmt.Sprintf(format, v...))
}
// Errorv writes v into error log with json content.
// No call stack attached, because not elegant to pack the messages.
func Errorv(v interface{}) {
errorAnySync(v)
}
// Info writes v into access log.
func Info(v ...interface{}) {
infoSync(fmt.Sprint(v...))
infoTextSync(fmt.Sprint(v...))
}
// Infof writes v with format into access log.
func Infof(format string, v ...interface{}) {
infoSync(fmt.Sprintf(format, v...))
infoTextSync(fmt.Sprintf(format, v...))
}
// Infov writes v into access log with json content.
func Infov(v interface{}) {
infoAnySync(v)
}
// Must checks if err is nil, otherwise logs the err and exits.
@@ -242,7 +263,7 @@ func Must(err error) {
if err != nil {
msg := formatWithCaller(err.Error(), 3)
log.Print(msg)
output(severeLog, levelFatal, msg)
outputText(severeLog, levelFatal, msg)
os.Exit(1)
}
}
@@ -264,12 +285,17 @@ func Severef(format string, v ...interface{}) {
// Slow writes v into slow log.
func Slow(v ...interface{}) {
slowSync(fmt.Sprint(v...))
slowTextSync(fmt.Sprint(v...))
}
// Slowf writes v with format into slow log.
func Slowf(format string, v ...interface{}) {
slowSync(fmt.Sprintf(format, v...))
slowTextSync(fmt.Sprintf(format, v...))
}
// Slowv writes v into slow log with json content.
func Slowv(v interface{}) {
slowAnySync(v)
}
// Stat writes v into stat log.
@@ -312,8 +338,14 @@ func createOutput(path string) (io.WriteCloser, error) {
options.gzipEnabled), options.gzipEnabled)
}
func errorSync(msg string, callDepth int) {
if shouldLog(ErrorLevel) {
func errorAnySync(v interface{}) {
if shallLog(ErrorLevel) {
outputAny(errorLog, levelError, v)
}
}
func errorTextSync(msg string, callDepth int) {
if shallLog(ErrorLevel) {
outputError(errorLog, msg, callDepth)
}
}
@@ -362,13 +394,28 @@ func handleOptions(opts []LogOption) {
}
}
func infoSync(msg string) {
if shouldLog(InfoLevel) {
output(infoLog, levelInfo, msg)
func infoAnySync(val interface{}) {
if shallLog(InfoLevel) {
outputAny(infoLog, levelInfo, val)
}
}
func output(writer io.Writer, level, msg string) {
func infoTextSync(msg string) {
if shallLog(InfoLevel) {
outputText(infoLog, levelInfo, msg)
}
}
func outputAny(writer io.Writer, level string, val interface{}) {
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
Content: val,
}
outputJson(writer, info)
}
func outputText(writer io.Writer, level, msg string) {
info := logEntry{
Timestamp: getTimestamp(),
Level: level,
@@ -379,7 +426,7 @@ func output(writer io.Writer, level, msg string) {
func outputError(writer io.Writer, msg string, callDepth int) {
content := formatWithCaller(msg, callDepth)
output(writer, levelError, content)
outputText(writer, levelError, content)
}
func outputJson(writer io.Writer, info interface{}) {
@@ -481,30 +528,40 @@ func setupWithVolume(c LogConf) error {
}
func severeSync(msg string) {
if shouldLog(SevereLevel) {
output(severeLog, levelSevere, fmt.Sprintf("%s\n%s", msg, string(debug.Stack())))
if shallLog(SevereLevel) {
outputText(severeLog, levelSevere, fmt.Sprintf("%s\n%s", msg, string(debug.Stack())))
}
}
func shouldLog(level uint32) bool {
func shallLog(level uint32) bool {
return atomic.LoadUint32(&logLevel) <= level
}
func slowSync(msg string) {
if shouldLog(ErrorLevel) {
output(slowLog, levelSlow, msg)
func shallLogStat() bool {
return atomic.LoadUint32(&disableStat) == 0
}
func slowAnySync(v interface{}) {
if shallLog(ErrorLevel) {
outputAny(slowLog, levelSlow, v)
}
}
func slowTextSync(msg string) {
if shallLog(ErrorLevel) {
outputText(slowLog, levelSlow, msg)
}
}
func stackSync(msg string) {
if shouldLog(ErrorLevel) {
output(stackLog, levelError, fmt.Sprintf("%s\n%s", msg, string(debug.Stack())))
if shallLog(ErrorLevel) {
outputText(stackLog, levelError, fmt.Sprintf("%s\n%s", msg, string(debug.Stack())))
}
}
func statSync(msg string) {
if shouldLog(InfoLevel) {
output(statLog, levelStat, msg)
if shallLogStat() && shallLog(InfoLevel) {
outputText(statLog, levelStat, msg)
}
}

View File

@@ -92,6 +92,30 @@ func TestStructedLogAlert(t *testing.T) {
})
}
func TestStructedLogError(t *testing.T) {
doTestStructedLog(t, levelError, func(writer io.WriteCloser) {
errorLog = writer
}, func(v ...interface{}) {
Error(v...)
})
}
func TestStructedLogErrorf(t *testing.T) {
doTestStructedLog(t, levelError, func(writer io.WriteCloser) {
errorLog = writer
}, func(v ...interface{}) {
Errorf("%s", fmt.Sprint(v...))
})
}
func TestStructedLogErrorv(t *testing.T) {
doTestStructedLog(t, levelError, func(writer io.WriteCloser) {
errorLog = writer
}, func(v ...interface{}) {
Errorv(fmt.Sprint(v...))
})
}
func TestStructedLogInfo(t *testing.T) {
doTestStructedLog(t, levelInfo, func(writer io.WriteCloser) {
infoLog = writer
@@ -100,6 +124,22 @@ func TestStructedLogInfo(t *testing.T) {
})
}
func TestStructedLogInfof(t *testing.T) {
doTestStructedLog(t, levelInfo, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
Infof("%s", fmt.Sprint(v...))
})
}
func TestStructedLogInfov(t *testing.T) {
doTestStructedLog(t, levelInfo, func(writer io.WriteCloser) {
infoLog = writer
}, func(v ...interface{}) {
Infov(fmt.Sprint(v...))
})
}
func TestStructedLogSlow(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer
@@ -116,6 +156,14 @@ func TestStructedLogSlowf(t *testing.T) {
})
}
func TestStructedLogSlowv(t *testing.T) {
doTestStructedLog(t, levelSlow, func(writer io.WriteCloser) {
slowLog = writer
}, func(v ...interface{}) {
Slowv(fmt.Sprint(v...))
})
}
func TestStructedLogStat(t *testing.T) {
doTestStructedLog(t, levelStat, func(writer io.WriteCloser) {
statLog = writer
@@ -246,6 +294,17 @@ func TestDisable(t *testing.T) {
assert.Nil(t, Close())
}
func TestDisableStat(t *testing.T) {
DisableStat()
const message = "hello there"
writer := new(mockWriter)
statLog = writer
atomic.StoreUint32(&initialized, 1)
Stat(message)
assert.Equal(t, 0, writer.builder.Len())
}
func TestWithGzip(t *testing.T) {
fn := WithGzip()
var opt logOptions
@@ -357,7 +416,9 @@ func doTestStructedLog(t *testing.T, level string, setup func(writer io.WriteClo
t.Error(err)
}
assert.Equal(t, level, entry.Level)
assert.True(t, strings.Contains(entry.Content, message))
val, ok := entry.Content.(string)
assert.True(t, ok)
assert.True(t, strings.Contains(val, message))
}
func testSetLevelTwiceWithMode(t *testing.T, mode string) {

View File

@@ -47,7 +47,6 @@ type (
done chan lang.PlaceholderType
rule RotateRule
compress bool
keepDays int
// can't use threading.RoutineGroup because of cycle import
waitGroup sync.WaitGroup
closeOnce sync.Once

View File

@@ -3,6 +3,7 @@ package logx
import (
"os"
"path/filepath"
"syscall"
"testing"
"time"
@@ -97,7 +98,13 @@ func TestRotateLoggerRotate(t *testing.T) {
}()
}
err = logger.rotate()
assert.Nil(t, err)
switch v := err.(type) {
case *os.LinkError:
// avoid rename error on docker container
assert.Equal(t, syscall.EXDEV, v.Err)
default:
assert.Nil(t, err)
}
}
func TestRotateLoggerWrite(t *testing.T) {

View File

@@ -44,5 +44,5 @@ func captureOutput(f func()) string {
func getContent(jsonStr string) string {
var entry logEntry
json.Unmarshal([]byte(jsonStr), &entry)
return entry.Content
return entry.Content.(string)
}

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/tal-tech/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/trace/tracespec"
"go.opentelemetry.io/otel/trace"
)
type traceLogger struct {
@@ -18,50 +18,68 @@ type traceLogger struct {
}
func (l *traceLogger) Error(v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, formatWithCaller(fmt.Sprint(v...), durationCallerDepth))
}
}
func (l *traceLogger) Errorf(format string, v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, formatWithCaller(fmt.Sprintf(format, v...), durationCallerDepth))
}
}
func (l *traceLogger) Errorv(v interface{}) {
if shallLog(ErrorLevel) {
l.write(errorLog, levelError, v)
}
}
func (l *traceLogger) Info(v ...interface{}) {
if shouldLog(InfoLevel) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, fmt.Sprint(v...))
}
}
func (l *traceLogger) Infof(format string, v ...interface{}) {
if shouldLog(InfoLevel) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, fmt.Sprintf(format, v...))
}
}
func (l *traceLogger) Infov(v interface{}) {
if shallLog(InfoLevel) {
l.write(infoLog, levelInfo, v)
}
}
func (l *traceLogger) Slow(v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, fmt.Sprint(v...))
}
}
func (l *traceLogger) Slowf(format string, v ...interface{}) {
if shouldLog(ErrorLevel) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, fmt.Sprintf(format, v...))
}
}
func (l *traceLogger) Slowv(v interface{}) {
if shallLog(ErrorLevel) {
l.write(slowLog, levelSlow, v)
}
}
func (l *traceLogger) WithDuration(duration time.Duration) Logger {
l.Duration = timex.ReprOfDuration(duration)
return l
}
func (l *traceLogger) write(writer io.Writer, level, content string) {
func (l *traceLogger) write(writer io.Writer, level string, val interface{}) {
l.Timestamp = getTimestamp()
l.Level = level
l.Content = content
l.Content = val
l.Trace = traceIdFromContext(l.ctx)
l.Span = spanIdFromContext(l.ctx)
outputJson(writer, l)
@@ -75,19 +93,19 @@ func WithContext(ctx context.Context) Logger {
}
func spanIdFromContext(ctx context.Context) string {
t, ok := ctx.Value(tracespec.TracingKey).(tracespec.Trace)
if !ok {
return ""
spanCtx := trace.SpanContextFromContext(ctx)
if spanCtx.HasSpanID() {
return spanCtx.SpanID().String()
}
return t.SpanId()
return ""
}
func traceIdFromContext(ctx context.Context) string {
t, ok := ctx.Value(tracespec.TracingKey).(tracespec.Trace)
if !ok {
return ""
spanCtx := trace.SpanContextFromContext(ctx)
if spanCtx.HasTraceID() {
return spanCtx.TraceID().String()
}
return t.TraceId()
return ""
}

View File

@@ -9,71 +9,90 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/trace/tracespec"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
const (
mockTraceID = "mock-trace-id"
mockSpanID = "mock-span-id"
traceKey = "trace"
spanKey = "span"
)
var mock tracespec.Trace = new(mockTrace)
func TestTraceLog(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock)
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
WithContext(ctx).(*traceLogger).write(&buf, levelInfo, testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceError(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
errorLog = newLogWriter(log.New(&buf, "", flags))
ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock)
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Error(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
buf.Reset()
l.WithDuration(time.Second).Errorf(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceInfo(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
infoLog = newLogWriter(log.New(&buf, "", flags))
ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock)
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceSlow(t *testing.T) {
var buf mockWriter
atomic.StoreUint32(&initialized, 1)
slowLog = newLogWriter(log.New(&buf, "", flags))
ctx := context.WithValue(context.Background(), tracespec.TracingKey, mock)
otp := otel.GetTracerProvider()
tp := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
defer otel.SetTracerProvider(otp)
ctx, _ := tp.Tracer("foo").Start(context.Background(), "bar")
l := WithContext(ctx).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Slow(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
buf.Reset()
l.WithDuration(time.Second).Slowf(testlog)
assert.True(t, strings.Contains(buf.String(), mockTraceID))
assert.True(t, strings.Contains(buf.String(), mockSpanID))
assert.True(t, strings.Contains(buf.String(), traceKey))
assert.True(t, strings.Contains(buf.String(), spanKey))
}
func TestTraceWithoutContext(t *testing.T) {
@@ -83,34 +102,10 @@ func TestTraceWithoutContext(t *testing.T) {
l := WithContext(context.Background()).(*traceLogger)
SetLevel(InfoLevel)
l.WithDuration(time.Second).Info(testlog)
assert.False(t, strings.Contains(buf.String(), mockTraceID))
assert.False(t, strings.Contains(buf.String(), mockSpanID))
assert.False(t, strings.Contains(buf.String(), traceKey))
assert.False(t, strings.Contains(buf.String(), spanKey))
buf.Reset()
l.WithDuration(time.Second).Infof(testlog)
assert.False(t, strings.Contains(buf.String(), mockTraceID))
assert.False(t, strings.Contains(buf.String(), mockSpanID))
}
type mockTrace struct{}
func (t mockTrace) TraceId() string {
return mockTraceID
}
func (t mockTrace) SpanId() string {
return mockSpanID
}
func (t mockTrace) Finish() {
}
func (t mockTrace) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return nil, nil
}
func (t mockTrace) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return nil, nil
}
func (t mockTrace) Visit(fn func(key, val string) bool) {
assert.False(t, strings.Contains(buf.String(), traceKey))
assert.False(t, strings.Contains(buf.String(), spanKey))
}

View File

@@ -43,7 +43,8 @@ type (
UnmarshalOption func(*unmarshalOptions)
unmarshalOptions struct {
fromString bool
fromString bool
canonicalKey func(key string) string
}
keyCache map[string][]string
@@ -229,7 +230,7 @@ func (u *Unmarshaler) processFieldPrimitive(field reflect.StructField, value ref
default:
switch v := mapValue.(type) {
case json.Number:
return u.processFieldPrimitiveWithJsonNumber(field, value, v, opts, fullName)
return u.processFieldPrimitiveWithJSONNumber(field, value, v, opts, fullName)
default:
if typeKind == valueKind {
if err := validateValueInOptions(opts.options(), mapValue); err != nil {
@@ -244,7 +245,7 @@ func (u *Unmarshaler) processFieldPrimitive(field reflect.StructField, value ref
return newTypeMismatchError(fullName)
}
func (u *Unmarshaler) processFieldPrimitiveWithJsonNumber(field reflect.StructField, value reflect.Value,
func (u *Unmarshaler) processFieldPrimitiveWithJSONNumber(field reflect.StructField, value reflect.Value,
v json.Number, opts *fieldOptionsWithContext, fullName string) error {
fieldType := field.Type
fieldKind := fieldType.Kind()
@@ -323,7 +324,11 @@ func (u *Unmarshaler) processNamedField(field reflect.StructField, value reflect
}
fullName = join(fullName, key)
mapValue, hasValue := getValue(m, key)
canonicalKey := key
if u.opts.canonicalKey != nil {
canonicalKey = u.opts.canonicalKey(key)
}
mapValue, hasValue := getValue(m, canonicalKey)
if hasValue {
return u.processNamedFieldWithValue(field, value, mapValue, key, opts, fullName)
}
@@ -513,14 +518,14 @@ func (u *Unmarshaler) fillSliceValue(slice reflect.Value, index int, baseKind re
target.Set(reflect.ValueOf(value))
ithVal.Set(target.Addr())
return nil
} else {
if ithVal.Kind() != reflect.TypeOf(value).Kind() {
return errTypeMismatch
}
ithVal.Set(reflect.ValueOf(value))
return nil
}
if ithVal.Kind() != reflect.TypeOf(value).Kind() {
return errTypeMismatch
}
ithVal.Set(reflect.ValueOf(value))
return nil
}
}
@@ -621,6 +626,13 @@ func WithStringValues() UnmarshalOption {
}
}
// WithCanonicalKeyFunc customizes a Unmarshaler with Canonical Key func
func WithCanonicalKeyFunc(f func(string) string) UnmarshalOption {
return func(opt *unmarshalOptions) {
opt.canonicalKey = f
}
}
func fillDurationValue(fieldKind reflect.Kind, value reflect.Value, dur string) error {
d, err := time.ParseDuration(dur)
if err != nil {

View File

@@ -112,6 +112,12 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
output := make(chan interface{})
defer func() {
for range output {
panic("more than one element written in reducer")
}
}()
collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan()
writer := newGuardedWriter(output, done.Done())

View File

@@ -202,6 +202,22 @@ func TestMapReduce(t *testing.T) {
}
}
func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
assert.Panics(t, func() {
MapReduce(func(source chan<- interface{}) {
for i := 0; i < 10; i++ {
source <- i
}
}, func(item interface{}, writer Writer, cancel func(error)) {
writer.Write(item)
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
drain(pipe)
writer.Write("one")
writer.Write("two")
})
})
}
func TestMapReduceVoid(t *testing.T) {
var value uint32
tests := []struct {

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package proc

View File

@@ -1,3 +1,4 @@
//go:build linux || darwin
// +build linux darwin
package proc

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package proc

View File

@@ -1,3 +1,4 @@
//go:build linux || darwin
// +build linux darwin
package proc

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package proc
@@ -14,5 +15,5 @@ func AddWrapUpListener(fn func()) func() {
return fn
}
func SetTimeoutToForceQuit(duration time.Duration) {
func SetTimeToForceQuit(duration time.Duration) {
}

View File

@@ -1,3 +1,4 @@
//go:build linux || darwin
// +build linux darwin
package proc

View File

@@ -0,0 +1,10 @@
//go:build windows
// +build windows
package proc
import "context"
func Done() <-chan struct{} {
return context.Background().Done()
}

View File

@@ -1,3 +1,4 @@
//go:build linux || darwin
// +build linux darwin
package proc
@@ -12,6 +13,8 @@ import (
const timeFormat = "0102150405"
var done = make(chan struct{})
func init() {
go func() {
var profiler Stopper
@@ -33,6 +36,13 @@ func init() {
profiler = nil
}
case syscall.SIGTERM:
select {
case <-done:
// already closed
default:
close(done)
}
gracefulStop(signals)
default:
logx.Error("Got unregistered signal:", v)
@@ -40,3 +50,8 @@ func init() {
}
}()
}
// Done returns the channel that notifies the process quitting.
func Done() <-chan struct{} {
return done
}

16
core/proc/signals_test.go Normal file
View File

@@ -0,0 +1,16 @@
package proc
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDone(t *testing.T) {
select {
case <-Done():
assert.Fail(t, "should run")
default:
}
assert.NotNil(t, Done())
}

View File

@@ -1,3 +1,4 @@
//go:build debug
// +build debug
package search

View File

@@ -7,6 +7,7 @@ import (
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/prometheus"
"github.com/tal-tech/go-zero/core/stat"
"github.com/tal-tech/go-zero/core/trace"
)
const (
@@ -29,6 +30,7 @@ type ServiceConf struct {
Mode string `json:",default=pro,options=dev|test|rt|pre|pro"`
MetricsUrl string `json:",optional"`
Prometheus prometheus.Config `json:",optional"`
Telemetry trace.Config `json:",optional"`
}
// MustSetUp sets up the service, exits on error.
@@ -49,6 +51,12 @@ func (sc ServiceConf) SetUp() error {
sc.initMode()
prometheus.StartAgent(sc.Prometheus)
if len(sc.Telemetry.Name) == 0 {
sc.Telemetry.Name = sc.Name
}
trace.StartAgent(sc.Telemetry)
if len(sc.MetricsUrl) > 0 {
stat.SetReportWriter(stat.NewRemoteWriter(sc.MetricsUrl))
}

View File

@@ -1,3 +1,4 @@
//go:build !linux
// +build !linux
package stat

View File

@@ -1,3 +1,4 @@
//go:build linux
// +build linux
package stat

View File

@@ -1,3 +1,4 @@
//go:build linux
// +build linux
package stat

View File

@@ -7,7 +7,9 @@ import (
)
func TestRefreshCpu(t *testing.T) {
assert.True(t, RefreshCpu() >= 0)
assert.NotPanics(t, func() {
RefreshCpu()
})
}
func BenchmarkRefreshCpu(b *testing.B) {

View File

@@ -1,3 +1,4 @@
//go:build !linux
// +build !linux
package internal

View File

@@ -38,7 +38,9 @@ func init() {
atomic.StoreInt64(&cpuUsage, usage)
})
case <-allTicker.C:
printUsage()
if logEnabled.True() {
printUsage()
}
}
}
}()

View File

@@ -29,7 +29,7 @@ type (
)
// New returns a Cache.
func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error,
func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error,
opts ...Option) Cache {
if len(c) == 0 || TotalWeights(c) <= 0 {
log.Fatal("no cache nodes")

View File

@@ -23,6 +23,7 @@ type mockedNode struct {
func (mc *mockedNode) Del(keys ...string) error {
var be errorx.BatchError
for _, key := range keys {
if _, ok := mc.vals[key]; !ok {
be.Add(mc.errNotFound)
@@ -30,6 +31,7 @@ func (mc *mockedNode) Del(keys ...string) error {
delete(mc.vals, key)
}
}
return be.Err()
}
@@ -102,7 +104,7 @@ func TestCache_SetDel(t *testing.T) {
Weight: 100,
},
}
c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder)
for i := 0; i < total; i++ {
if i%2 == 0 {
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
@@ -140,7 +142,7 @@ func TestCache_OneNode(t *testing.T) {
Weight: 100,
},
}
c := New(conf, syncx.NewSharedCalls(), NewStat("mock"), errPlaceholder)
c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder)
for i := 0; i < total; i++ {
if i%2 == 0 {
assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))

View File

@@ -29,7 +29,7 @@ type cacheNode struct {
rds *redis.Redis
expiry time.Duration
notFoundExpiry time.Duration
barrier syncx.SharedCalls
barrier syncx.SingleFlight
r *rand.Rand
lock *sync.Mutex
unstableExpiry mathx.Unstable
@@ -43,7 +43,7 @@ type cacheNode struct {
// st is used to stat the cache.
// errNotFound defines the error that returned on cache not found.
// opts are the options that customize the cacheNode.
func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *Stat,
func NewNode(rds *redis.Redis, barrier syncx.SingleFlight, st *Stat,
errNotFound error, opts ...Option) Cache {
o := newOptions(opts...)
return cacheNode{
@@ -65,9 +65,18 @@ func (c cacheNode) Del(keys ...string) error {
return nil
}
if _, err := c.rds.Del(keys...); err != nil {
logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
c.asyncRetryDelCache(keys...)
if len(keys) > 1 && c.rds.Type == redis.ClusterType {
for _, key := range keys {
if _, err := c.rds.Del(key); err != nil {
logx.Errorf("failed to clear cache with key: %q, error: %v", key, err)
c.asyncRetryDelCache(key)
}
}
} else {
if _, err := c.rds.Del(keys...); err != nil {
logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
c.asyncRetryDelCache(keys...)
}
}
return nil

View File

@@ -29,6 +29,7 @@ func init() {
func TestCacheNode_DelCache(t *testing.T) {
store, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
store.Type = redis.ClusterType
defer clean()
cn := cacheNode{
@@ -49,13 +50,30 @@ func TestCacheNode_DelCache(t *testing.T) {
assert.Nil(t, cn.Del("first", "second"))
}
func TestCacheNode_DelCacheWithErrors(t *testing.T) {
store, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
store.Type = redis.ClusterType
clean()
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),
errNotFound: errTestNotFound,
}
assert.Nil(t, cn.Del("third", "fourth"))
}
func TestCacheNode_InvalidCache(t *testing.T) {
s, err := miniredis.Run()
assert.Nil(t, err)
defer s.Close()
cn := cacheNode{
rds: redis.NewRedis(s.Addr(), redis.NodeType),
rds: redis.New(s.Addr()),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
@@ -78,7 +96,7 @@ func TestCacheNode_Take(t *testing.T) {
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),
@@ -105,7 +123,7 @@ func TestCacheNode_TakeNotFound(t *testing.T) {
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),
@@ -144,7 +162,7 @@ func TestCacheNode_TakeWithExpire(t *testing.T) {
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),
@@ -171,7 +189,7 @@ func TestCacheNode_String(t *testing.T) {
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),
@@ -188,7 +206,7 @@ func TestCacheValueWithBigInt(t *testing.T) {
cn := cacheNode{
rds: store,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
barrier: syncx.NewSharedCalls(),
barrier: syncx.NewSingleFlight(),
lock: new(sync.Mutex),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
stat: NewStat("any"),

View File

@@ -1,7 +1,7 @@
package clickhouse
import (
// imports the driver.
// imports the driver, don't remove this comment, golint requires.
_ "github.com/ClickHouse/clickhouse-go"
"github.com/tal-tech/go-zero/core/stores/sqlx"
)

View File

@@ -5,9 +5,10 @@
package internal
import (
reflect "reflect"
mgo "github.com/globalsign/mgo"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockMgoCollection is a mock of MgoCollection interface

View File

@@ -5,9 +5,10 @@
package mongo
import (
reflect "reflect"
bson "github.com/globalsign/mgo/bson"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockIter is a mock of Iter interface

View File

@@ -11,8 +11,8 @@ var (
// ErrNotFound is an alias of mgo.ErrNotFound.
ErrNotFound = mgo.ErrNotFound
// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
sharedCalls = syncx.NewSharedCalls()
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
sharedCalls = syncx.NewSingleFlight()
stats = cache.NewStat("mongoc")
)

View File

@@ -120,7 +120,7 @@ func TestStatCacheFails(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
r := redis.NewRedis("localhost:59999", redis.NodeType)
r := redis.New("localhost:59999")
cach := cache.NewNode(r, sharedCalls, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach)

View File

@@ -1,7 +1,7 @@
package postgres
import (
// imports the driver.
// imports the driver, don't remove this comment, golint requires.
_ "github.com/lib/pq"
"github.com/tal-tech/go-zero/core/stores/sqlx"
)

View File

@@ -71,6 +71,8 @@ type (
IntCmd = red.IntCmd
// FloatCmd is an alias of redis.FloatCmd.
FloatCmd = red.FloatCmd
// StringCmd is an alias of redis.StringCmd.
StringCmd = red.StringCmd
)
// New returns a Redis with given options.
@@ -88,6 +90,7 @@ func New(addr string, opts ...Option) *Redis {
return r
}
// Deprecated: use New instead, will be removed in v2.
// NewRedis returns a Redis.
func NewRedis(redisAddr, redisType string, redisPass ...string) *Redis {
var opts []Option

View File

@@ -963,7 +963,7 @@ func TestRedis_Pipelined(t *testing.T) {
func TestRedisString(t *testing.T) {
runOnRedis(t, func(client *Redis) {
client.Ping()
_, err := getRedis(NewRedis(client.Addr, ClusterType))
_, err := getRedis(New(client.Addr, Cluster()))
assert.Nil(t, err)
assert.Equal(t, client.Addr, client.String())
assert.NotNil(t, New(client.Addr, badType()).Ping())
@@ -1075,7 +1075,7 @@ func TestRedisGeo(t *testing.T) {
func TestRedis_WithPass(t *testing.T) {
runOnRedis(t, func(client *Redis) {
err := NewRedis(client.Addr, NodeType, "any").Ping()
err := New(client.Addr, WithPass("any")).Ping()
assert.NotNil(t, err)
})
}
@@ -1095,7 +1095,7 @@ func runOnRedis(t *testing.T, fn func(client *Redis)) {
client.Close()
}
}()
fn(NewRedis(s.Addr(), NodeType))
fn(New(s.Addr()))
}
func runOnRedisTLS(t *testing.T, fn func(client *Redis)) {

View File

@@ -10,10 +10,10 @@ import (
func TestBlockingNode(t *testing.T) {
r, err := miniredis.Run()
assert.Nil(t, err)
node, err := CreateBlockingNode(NewRedis(r.Addr(), NodeType))
node, err := CreateBlockingNode(New(r.Addr()))
assert.Nil(t, err)
node.Close()
node, err = CreateBlockingNode(NewRedis(r.Addr(), ClusterType))
node, err = CreateBlockingNode(New(r.Addr(), Cluster()))
assert.Nil(t, err)
node.Close()
}

View File

@@ -15,7 +15,7 @@ func CreateRedis() (r *redis.Redis, clean func(), err error) {
return nil, nil, err
}
return redis.NewRedis(mr.Addr(), redis.NodeType), func() {
return redis.New(mr.Addr()), func() {
ch := make(chan lang.PlaceholderType)
go func() {
mr.Close()

View File

@@ -17,8 +17,8 @@ var (
// ErrNotFound is an alias of sqlx.ErrNotFound.
ErrNotFound = sqlx.ErrNotFound
// can't use one SharedCalls per conn, because multiple conns may share the same cache key.
exclusiveCalls = syncx.NewSharedCalls()
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
exclusiveCalls = syncx.NewSingleFlight()
stats = cache.NewStat("sqlc")
)

View File

@@ -286,7 +286,7 @@ func TestStatCacheFails(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
r := redis.NewRedis("localhost:59999", redis.NodeType)
r := redis.New("localhost:59999")
c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
for i := 0; i < 20; i++ {
@@ -485,7 +485,7 @@ func TestCachedConnExecDropCache(t *testing.T) {
value = "any"
)
var conn trackedConn
c := NewNodeConn(&conn, redis.NewRedis(r.Addr(), redis.NodeType), cache.WithExpiry(time.Second*30))
c := NewNodeConn(&conn, redis.New(r.Addr()), cache.WithExpiry(time.Second*30))
assert.Nil(t, c.SetCache(key, value))
_, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
return conn.Exec("delete from user_table where id='kevin'")
@@ -503,7 +503,7 @@ func TestCachedConnExecDropCache(t *testing.T) {
func TestCachedConnExecDropCacheFailed(t *testing.T) {
const key = "user"
var conn trackedConn
r := redis.NewRedis("anyredis:8888", redis.NodeType)
r := redis.New("anyredis:8888")
c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
_, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
return conn.Exec("delete from user_table where id='kevin'")
@@ -600,6 +600,10 @@ func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...inte
return nil
}
func (d dummySqlConn) RawDB() (*sql.DB, error) {
return nil, nil
}
func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
return nil
}
@@ -621,6 +625,10 @@ func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}
return c.dummySqlConn.QueryRows(v, query, args...)
}
func (c *trackedConn) RawDB() (*sql.DB, error) {
return nil, nil
}
func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
c.transactValue = true
return c.dummySqlConn.Transact(fn)

View File

@@ -43,6 +43,10 @@ func (c *mockedConn) QueryRowsPartial(v interface{}, query string, args ...inter
panic("should not called")
}
func (c *mockedConn) RawDB() (*sql.DB, error) {
panic("should not called")
}
func (c *mockedConn) Transact(func(session Session) error) error {
panic("should not called")
}

View File

@@ -4,6 +4,7 @@ import (
"database/sql"
"github.com/tal-tech/go-zero/core/breaker"
"github.com/tal-tech/go-zero/core/logx"
)
// ErrNotFound is an alias of sql.ErrNoRows
@@ -23,6 +24,8 @@ type (
// SqlConn only stands for raw connections, so Transact method can be called.
SqlConn interface {
Session
// RawDB is for other ORM to operate with, use it with caution.
RawDB() (*sql.DB, error)
Transact(func(session Session) error) error
}
@@ -43,13 +46,15 @@ type (
// Because CORBA doesn't support PREPARE, so we need to combine the
// query arguments into one string and do underlying query without arguments
commonSqlConn struct {
driverName string
datasource string
beginTx beginnable
brk breaker.Breaker
accept func(error) bool
connProv connProvider
onError func(error)
beginTx beginnable
brk breaker.Breaker
accept func(error) bool
}
connProvider func() (*sql.DB, error)
sessionConn interface {
Exec(query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
@@ -69,10 +74,34 @@ type (
// NewSqlConn returns a SqlConn with given driver name and datasource.
func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn {
conn := &commonSqlConn{
driverName: driverName,
datasource: datasource,
beginTx: begin,
brk: breaker.NewBreaker(),
connProv: func() (*sql.DB, error) {
return getSqlConn(driverName, datasource)
},
onError: func(err error) {
logInstanceError(datasource, err)
},
beginTx: begin,
brk: breaker.NewBreaker(),
}
for _, opt := range opts {
opt(conn)
}
return conn
}
// NewSqlConnFromDB returns a SqlConn with the given sql.DB.
// Use it with caution, it's provided for other ORM to interact with.
func NewSqlConnFromDB(db *sql.DB, opts ...SqlOption) SqlConn {
conn := &commonSqlConn{
connProv: func() (*sql.DB, error) {
return db, nil
},
onError: func(err error) {
logx.Errorf("Error on getting sql instance: %v", err)
},
beginTx: begin,
brk: breaker.NewBreaker(),
}
for _, opt := range opts {
opt(conn)
@@ -84,9 +113,9 @@ func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn {
func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result, err error) {
err = db.brk.DoWithAcceptable(func() error {
var conn *sql.DB
conn, err = getSqlConn(db.driverName, db.datasource)
conn, err = db.connProv()
if err != nil {
logInstanceError(db.datasource, err)
db.onError(err)
return err
}
@@ -100,9 +129,9 @@ func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result,
func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) {
err = db.brk.DoWithAcceptable(func() error {
var conn *sql.DB
conn, err = getSqlConn(db.driverName, db.datasource)
conn, err = db.connProv()
if err != nil {
logInstanceError(db.datasource, err)
db.onError(err)
return err
}
@@ -145,6 +174,10 @@ func (db *commonSqlConn) QueryRowsPartial(v interface{}, q string, args ...inter
}, q, args...)
}
func (db *commonSqlConn) RawDB() (*sql.DB, error) {
return db.connProv()
}
func (db *commonSqlConn) Transact(fn func(Session) error) error {
return db.brk.DoWithAcceptable(func() error {
return transact(db, db.beginTx, fn)
@@ -163,9 +196,9 @@ func (db *commonSqlConn) acceptable(err error) bool {
func (db *commonSqlConn) queryRows(scanner func(*sql.Rows) error, q string, args ...interface{}) error {
var qerr error
return db.brk.DoWithAcceptable(func() error {
conn, err := getSqlConn(db.driverName, db.datasource)
conn, err := db.connProv()
if err != nil {
logInstanceError(db.datasource, err)
db.onError(err)
return err
}

View File

@@ -21,12 +21,15 @@ func TestSqlConn(t *testing.T) {
mock.ExpectExec("any")
mock.ExpectQuery("any").WillReturnRows(sqlmock.NewRows([]string{"foo"}))
conn := NewMysql(mockedDatasource)
db, err := conn.RawDB()
assert.Nil(t, err)
rawConn := NewSqlConnFromDB(db, withMysqlAcceptable())
badConn := NewMysql("badsql")
_, err := conn.Exec("any", "value")
_, err = conn.Exec("any", "value")
assert.NotNil(t, err)
_, err = badConn.Exec("any", "value")
assert.NotNil(t, err)
_, err = conn.Prepare("any")
_, err = rawConn.Prepare("any")
assert.NotNil(t, err)
_, err = badConn.Prepare("any")
assert.NotNil(t, err)

View File

@@ -30,7 +30,8 @@ func (t txSession) Prepare(q string) (StmtSession, error) {
}
return statement{
stmt: stmt,
query: q,
stmt: stmt,
}, nil
}
@@ -70,9 +71,9 @@ func begin(db *sql.DB) (trans, error) {
}
func transact(db *commonSqlConn, b beginnable, fn func(Session) error) (err error) {
conn, err := getSqlConn(db.driverName, db.datasource)
conn, err := db.connProv()
if err != nil {
logInstanceError(db.datasource, err)
db.onError(err)
return err
}

View File

@@ -40,6 +40,24 @@ func Filter(s string, filter func(r rune) bool) string {
return string(chars[:n])
}
// FirstN returns first n runes from s.
func FirstN(s string, n int, ellipsis ...string) string {
var i int
for j := range s {
if i == n {
ret := s[:j]
for _, each := range ellipsis {
ret += each
}
return ret
}
i++
}
return s
}
// HasEmpty checks if there are empty strings in args.
func HasEmpty(args ...string) bool {
for _, arg := range args {

View File

@@ -92,6 +92,61 @@ func TestFilter(t *testing.T) {
}
}
func TestFirstN(t *testing.T) {
tests := []struct {
name string
input string
n int
ellipsis string
expect string
}{
{
name: "english string",
input: "anything that we use",
n: 8,
expect: "anything",
},
{
name: "english string with ellipsis",
input: "anything that we use",
n: 8,
ellipsis: "...",
expect: "anything...",
},
{
name: "english string more",
input: "anything that we use",
n: 80,
expect: "anything that we use",
},
{
name: "chinese string",
input: "我是中国人",
n: 2,
expect: "我是",
},
{
name: "chinese string with ellipsis",
input: "我是中国人",
n: 2,
ellipsis: "...",
expect: "我是...",
},
{
name: "chinese string",
input: "我是中国人",
n: 10,
expect: "我是中国人",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expect, FirstN(test.input, test.n, test.ellipsis))
})
}
}
func TestRemove(t *testing.T) {
cases := []struct {
input []string

View File

@@ -19,10 +19,8 @@ func TestDoneChanDone(t *testing.T) {
waitGroup.Add(1)
go func() {
select {
case <-doneChan.Done():
waitGroup.Done()
}
<-doneChan.Done()
waitGroup.Done()
}()
for i := 0; i < 5; i++ {

View File

@@ -17,10 +17,11 @@ func TestPoolGet(t *testing.T) {
ch := make(chan lang.PlaceholderType)
for i := 0; i < limit; i++ {
var fail AtomicBool
go func() {
v := stack.Get()
if v.(int) != 1 {
t.Fatal("unmatch value")
fail.Set(true)
}
ch <- lang.Placeholder
}()
@@ -30,6 +31,10 @@ func TestPoolGet(t *testing.T) {
case <-time.After(time.Second):
t.Fail()
}
if fail.True() {
t.Fatal("unmatch value")
}
}
}

View File

@@ -9,20 +9,21 @@ import (
// A ResourceManager is a manager that used to manage resources.
type ResourceManager struct {
resources map[string]io.Closer
sharedCalls SharedCalls
lock sync.RWMutex
resources map[string]io.Closer
singleFlight SingleFlight
lock sync.RWMutex
}
// NewResourceManager returns a ResourceManager.
func NewResourceManager() *ResourceManager {
return &ResourceManager{
resources: make(map[string]io.Closer),
sharedCalls: NewSharedCalls(),
resources: make(map[string]io.Closer),
singleFlight: NewSingleFlight(),
}
}
// Close closes the manager.
// Don't use the ResourceManager after Close() called.
func (manager *ResourceManager) Close() error {
manager.lock.Lock()
defer manager.lock.Unlock()
@@ -34,12 +35,15 @@ func (manager *ResourceManager) Close() error {
}
}
// release resources to avoid using it later
manager.resources = nil
return be.Err()
}
// GetResource returns the resource associated with given key.
func (manager *ResourceManager) GetResource(key string, create func() (io.Closer, error)) (io.Closer, error) {
val, err := manager.sharedCalls.Do(key, func() (interface{}, error) {
val, err := manager.singleFlight.Do(key, func() (interface{}, error) {
manager.lock.RLock()
resource, ok := manager.resources[key]
manager.lock.RUnlock()

View File

@@ -44,3 +44,31 @@ func TestResourceManager_GetResourceError(t *testing.T) {
assert.NotNil(t, err)
}
}
func TestResourceManager_Close(t *testing.T) {
manager := NewResourceManager()
for i := 0; i < 10; i++ {
_, err := manager.GetResource("key", func() (io.Closer, error) {
return nil, errors.New("fail")
})
assert.NotNil(t, err)
}
if assert.NoError(t, manager.Close()) {
assert.Equal(t, 0, len(manager.resources))
}
}
func TestResourceManager_UseAfterClose(t *testing.T) {
manager := NewResourceManager()
_, err := manager.GetResource("key", func() (io.Closer, error) {
return nil, errors.New("fail")
})
assert.NotNil(t, err)
if assert.NoError(t, manager.Close()) {
_, err = manager.GetResource("key", func() (io.Closer, error) {
return nil, errors.New("fail")
})
assert.NotNil(t, err)
}
}

View File

@@ -3,13 +3,17 @@ package syncx
import "sync"
type (
// SharedCalls lets the concurrent calls with the same key to share the call result.
// SharedCalls is an alias of SingleFlight.
// Deprecated: use SingleFlight.
SharedCalls = SingleFlight
// SingleFlight lets the concurrent calls with the same key to share the call result.
// For example, A called F, before it's done, B called F. Then B would not execute F,
// and shared the result returned by F which called by A.
// The calls with the same key are dependent, concurrent calls share the returned values.
// A ------->calls F with key<------------------->returns val
// B --------------------->calls F with key------>returns val
SharedCalls interface {
SingleFlight interface {
Do(key string, fn func() (interface{}, error)) (interface{}, error)
DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
}
@@ -20,20 +24,26 @@ type (
err error
}
sharedGroup struct {
flightGroup struct {
calls map[string]*call
lock sync.Mutex
}
)
// NewSharedCalls returns a SharedCalls.
func NewSharedCalls() SharedCalls {
return &sharedGroup{
// NewSingleFlight returns a SingleFlight.
func NewSingleFlight() SingleFlight {
return &flightGroup{
calls: make(map[string]*call),
}
}
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
// NewSharedCalls returns a SingleFlight.
// Deprecated: use NewSingleFlight.
func NewSharedCalls() SingleFlight {
return NewSingleFlight()
}
func (g *flightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
c, done := g.createCall(key)
if done {
return c.val, c.err
@@ -43,7 +53,7 @@ func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{
return c.val, c.err
}
func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
func (g *flightGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
c, done := g.createCall(key)
if done {
return c.val, false, c.err
@@ -53,7 +63,7 @@ func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val inte
return c.val, true, c.err
}
func (g *sharedGroup) createCall(key string) (c *call, done bool) {
func (g *flightGroup) createCall(key string) (c *call, done bool) {
g.lock.Lock()
if c, ok := g.calls[key]; ok {
g.lock.Unlock()
@@ -69,7 +79,7 @@ func (g *sharedGroup) createCall(key string) (c *call, done bool) {
return c, false
}
func (g *sharedGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
func (g *flightGroup) makeCall(c *call, key string, fn func() (interface{}, error)) {
defer func() {
g.lock.Lock()
delete(g.calls, key)

View File

@@ -10,7 +10,7 @@ import (
)
func TestExclusiveCallDo(t *testing.T) {
g := NewSharedCalls()
g := NewSingleFlight()
v, err := g.Do("key", func() (interface{}, error) {
return "bar", nil
})
@@ -23,7 +23,7 @@ func TestExclusiveCallDo(t *testing.T) {
}
func TestExclusiveCallDoErr(t *testing.T) {
g := NewSharedCalls()
g := NewSingleFlight()
someErr := errors.New("some error")
v, err := g.Do("key", func() (interface{}, error) {
return nil, someErr
@@ -37,7 +37,7 @@ func TestExclusiveCallDoErr(t *testing.T) {
}
func TestExclusiveCallDoDupSuppress(t *testing.T) {
g := NewSharedCalls()
g := NewSingleFlight()
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {
@@ -69,7 +69,7 @@ func TestExclusiveCallDoDupSuppress(t *testing.T) {
}
func TestExclusiveCallDoDiffDupSuppress(t *testing.T) {
g := NewSharedCalls()
g := NewSingleFlight()
broadcast := make(chan struct{})
var calls int32
tests := []string{"e", "a", "e", "a", "b", "c", "b", "a", "c", "d", "b", "c", "d"}
@@ -102,7 +102,7 @@ func TestExclusiveCallDoDiffDupSuppress(t *testing.T) {
}
func TestExclusiveCallDoExDupSuppress(t *testing.T) {
g := NewSharedCalls()
g := NewSingleFlight()
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {

View File

@@ -1,11 +1,14 @@
package syncx
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/lang"
)
func TestTryLock(t *testing.T) {
@@ -30,8 +33,6 @@ func TestSpinLockRace(t *testing.T) {
var wait sync.WaitGroup
wait.Add(1)
go func() {
lock.Lock()
lock.Unlock()
wait.Done()
}()
time.Sleep(time.Millisecond * 100)
@@ -39,3 +40,31 @@ func TestSpinLockRace(t *testing.T) {
wait.Wait()
assert.True(t, lock.TryLock())
}
func TestSpinLock_TryLock(t *testing.T) {
var lock SpinLock
var count int32
var wait sync.WaitGroup
wait.Add(2)
sig := make(chan lang.PlaceholderType)
go func() {
lock.TryLock()
sig <- lang.Placeholder
atomic.AddInt32(&count, 1)
runtime.Gosched()
lock.Unlock()
wait.Done()
}()
go func() {
<-sig
lock.Lock()
atomic.AddInt32(&count, 1)
lock.Unlock()
wait.Done()
}()
wait.Wait()
assert.Equal(t, int32(2), atomic.LoadInt32(&count))
}

69
core/trace/agent.go Normal file
View File

@@ -0,0 +1,69 @@
package trace
import (
"fmt"
"sync"
"github.com/tal-tech/go-zero/core/logx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/zipkin"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)
const (
kindJaeger = "jaeger"
kindZipkin = "zipkin"
)
var once sync.Once
// StartAgent starts a opentelemetry agent.
func StartAgent(c Config) {
once.Do(func() {
startAgent(c)
})
}
func createExporter(c Config) (sdktrace.SpanExporter, error) {
// Just support jaeger now, more for later
switch c.Batcher {
case kindJaeger:
return jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(c.Endpoint)))
case kindZipkin:
return zipkin.New(c.Endpoint)
default:
return nil, fmt.Errorf("unknown exporter: %s", c.Batcher)
}
}
func startAgent(c Config) {
opts := []sdktrace.TracerProviderOption{
// Set the sampling rate based on the parent span to 100%
sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(c.Sampler))),
// Record information about this application in an Resource.
sdktrace.WithResource(resource.NewSchemaless(semconv.ServiceNameKey.String(c.Name))),
}
if len(c.Endpoint) > 0 {
exp, err := createExporter(c)
if err != nil {
logx.Error(err)
return
}
// Always be sure to batch in production.
opts = append(opts, sdktrace.WithBatcher(exp))
}
tp := sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
logx.Errorf("[otel] error: %v", err)
}))
}

40
core/trace/attributes.go Normal file
View File

@@ -0,0 +1,40 @@
package trace
import (
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
gcodes "google.golang.org/grpc/codes"
)
const (
// GRPCStatusCodeKey is convention for numeric status code of a gRPC request.
GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code")
// RPCNameKey is the name of message transmitted or received.
RPCNameKey = attribute.Key("name")
// RPCMessageTypeKey is the type of message transmitted or received.
RPCMessageTypeKey = attribute.Key("message.type")
// RPCMessageIDKey is the identifier of message transmitted or received.
RPCMessageIDKey = attribute.Key("message.id")
// RPCMessageCompressedSizeKey is the compressed size of the message transmitted or received in bytes.
RPCMessageCompressedSizeKey = attribute.Key("message.compressed_size")
// RPCMessageUncompressedSizeKey is the uncompressed size of the message
// transmitted or received in bytes.
RPCMessageUncompressedSizeKey = attribute.Key("message.uncompressed_size")
)
// Semantic conventions for common RPC attributes.
var (
// RPCSystemGRPC is the semantic convention for gRPC as the remoting system.
RPCSystemGRPC = semconv.RPCSystemKey.String("grpc")
// RPCNameMessage is the semantic convention for a message named message.
RPCNameMessage = RPCNameKey.String("message")
// RPCMessageTypeSent is the semantic conventions for sent RPC message types.
RPCMessageTypeSent = RPCMessageTypeKey.String("SENT")
// RPCMessageTypeReceived is the semantic conventions for the received RPC message types.
RPCMessageTypeReceived = RPCMessageTypeKey.String("RECEIVED")
)
// StatusCodeAttr returns a attribute.KeyValue that represents the give c.
func StatusCodeAttr(c gcodes.Code) attribute.KeyValue {
return GRPCStatusCodeKey.Int64(int64(c))
}

View File

@@ -1,43 +0,0 @@
package trace
import (
"errors"
"net/http"
"strings"
)
// ErrInvalidCarrier indicates an error that the carrier is invalid.
var ErrInvalidCarrier = errors.New("invalid carrier")
type (
// Carrier interface wraps the Get and Set method.
Carrier interface {
Get(key string) string
Set(key, value string)
}
httpCarrier http.Header
// grpc metadata takes keys as case insensitive
grpcCarrier map[string][]string
)
func (h httpCarrier) Get(key string) string {
return http.Header(h).Get(key)
}
func (h httpCarrier) Set(key, val string) {
http.Header(h).Set(key, val)
}
func (g grpcCarrier) Get(key string) string {
if vals, ok := g[strings.ToLower(key)]; ok && len(vals) > 0 {
return vals[0]
}
return ""
}
func (g grpcCarrier) Set(key, val string) {
key = strings.ToLower(key)
g[key] = append(g[key], val)
}

View File

@@ -1,58 +0,0 @@
package trace
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
)
func TestHttpCarrier(t *testing.T) {
tests := []map[string]string{
{},
{
"first": "a",
"second": "b",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
carrier := httpCarrier(req.Header)
for k, v := range test {
carrier.Set(k, v)
}
for k, v := range test {
assert.Equal(t, v, carrier.Get(k))
}
assert.Equal(t, "", carrier.Get("none"))
})
}
}
func TestGrpcCarrier(t *testing.T) {
tests := []map[string]string{
{},
{
"first": "a",
"second": "b",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
m := make(map[string][]string)
carrier := grpcCarrier(m)
for k, v := range test {
carrier.Set(k, v)
}
for k, v := range test {
assert.Equal(t, v, carrier.Get(k))
}
assert.Equal(t, "", carrier.Get("none"))
})
}
}

12
core/trace/config.go Normal file
View File

@@ -0,0 +1,12 @@
package trace
// TraceName represents the tracing name.
const TraceName = "go-zero"
// A Config is a opentelemetry config.
type Config struct {
Name string `json:",optional"`
Endpoint string `json:",optional"`
Sampler float64 `json:",default=1.0"`
Batcher string `json:",default=jaeger,options=jaeger|zipkin"`
}

View File

@@ -1,6 +0,0 @@
package trace
const (
traceIdKey = "X-Trace-ID"
spanIdKey = "X-Span-ID"
)

38
core/trace/message.go Normal file
View File

@@ -0,0 +1,38 @@
package trace
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"
)
const messageEvent = "message"
var (
// MessageSent is the type of sent messages.
MessageSent = messageType(RPCMessageTypeSent)
// MessageReceived is the type of received messages.
MessageReceived = messageType(RPCMessageTypeReceived)
)
type messageType attribute.KeyValue
// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m messageType) Event(ctx context.Context, id int, message interface{}) {
span := trace.SpanFromContext(ctx)
if p, ok := message.(proto.Message); ok {
span.AddEvent(messageEvent, trace.WithAttributes(
attribute.KeyValue(m),
RPCMessageIDKey.Int(id),
RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
))
} else {
span.AddEvent(messageEvent, trace.WithAttributes(
attribute.KeyValue(m),
RPCMessageIDKey.Int(id),
))
}
}

View File

@@ -1,33 +0,0 @@
package trace
import (
"context"
"github.com/tal-tech/go-zero/core/trace/tracespec"
)
var emptyNoopSpan = noopSpan{}
type noopSpan struct{}
func (s noopSpan) Finish() {
}
func (s noopSpan) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return ctx, emptyNoopSpan
}
func (s noopSpan) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
return ctx, emptyNoopSpan
}
func (s noopSpan) SpanId() string {
return ""
}
func (s noopSpan) TraceId() string {
return ""
}
func (s noopSpan) Visit(fn func(key, val string) bool) {
}

View File

@@ -1,32 +0,0 @@
package trace
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNoopSpan_Fork(t *testing.T) {
ctx, span := emptyNoopSpan.Fork(context.Background(), "", "")
assert.Equal(t, emptyNoopSpan, span)
assert.Equal(t, context.Background(), ctx)
}
func TestNoopSpan_Follow(t *testing.T) {
ctx, span := emptyNoopSpan.Follow(context.Background(), "", "")
assert.Equal(t, emptyNoopSpan, span)
assert.Equal(t, context.Background(), ctx)
}
func TestNoopSpan(t *testing.T) {
emptyNoopSpan.Visit(func(key, val string) bool {
assert.Fail(t, "should not go here")
return true
})
ctx, span := emptyNoopSpan.Follow(context.Background(), "", "")
assert.Equal(t, context.Background(), ctx)
assert.Equal(t, "", span.TraceId())
assert.Equal(t, "", span.SpanId())
}

View File

@@ -1,90 +0,0 @@
package trace
import (
"net/http"
"google.golang.org/grpc/metadata"
)
const (
// HttpFormat means http carrier format.
HttpFormat = iota
// GrpcFormat means grpc carrier format.
GrpcFormat
)
var (
emptyHttpPropagator httpPropagator
emptyGrpcPropagator grpcPropagator
)
type (
// Propagator interface wraps the Extract and Inject methods.
Propagator interface {
Extract(carrier interface{}) (Carrier, error)
Inject(carrier interface{}) (Carrier, error)
}
httpPropagator struct{}
grpcPropagator struct{}
)
func (h httpPropagator) Extract(carrier interface{}) (Carrier, error) {
if c, ok := carrier.(http.Header); ok {
return httpCarrier(c), nil
}
return nil, ErrInvalidCarrier
}
func (h httpPropagator) Inject(carrier interface{}) (Carrier, error) {
if c, ok := carrier.(http.Header); ok {
return httpCarrier(c), nil
}
return nil, ErrInvalidCarrier
}
func (g grpcPropagator) Extract(carrier interface{}) (Carrier, error) {
if c, ok := carrier.(metadata.MD); ok {
return grpcCarrier(c), nil
}
return nil, ErrInvalidCarrier
}
func (g grpcPropagator) Inject(carrier interface{}) (Carrier, error) {
if c, ok := carrier.(metadata.MD); ok {
return grpcCarrier(c), nil
}
return nil, ErrInvalidCarrier
}
// Extract extracts tracing information from carrier with given format.
func Extract(format, carrier interface{}) (Carrier, error) {
switch v := format.(type) {
case int:
if v == HttpFormat {
return emptyHttpPropagator.Extract(carrier)
} else if v == GrpcFormat {
return emptyGrpcPropagator.Extract(carrier)
}
}
return nil, ErrInvalidCarrier
}
// Inject injects tracing information into carrier with given format.
func Inject(format, carrier interface{}) (Carrier, error) {
switch v := format.(type) {
case int:
if v == HttpFormat {
return emptyHttpPropagator.Inject(carrier)
} else if v == GrpcFormat {
return emptyGrpcPropagator.Inject(carrier)
}
}
return nil, ErrInvalidCarrier
}

View File

@@ -1,68 +0,0 @@
package trace
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
)
func TestHttpPropagator_Extract(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
req.Header.Set(traceIdKey, "trace")
req.Header.Set(spanIdKey, "span")
carrier, err := Extract(HttpFormat, req.Header)
assert.Nil(t, err)
assert.Equal(t, "trace", carrier.Get(traceIdKey))
assert.Equal(t, "span", carrier.Get(spanIdKey))
_, err = Extract(HttpFormat, req)
assert.Equal(t, ErrInvalidCarrier, err)
}
func TestHttpPropagator_Inject(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
req.Header.Set(traceIdKey, "trace")
req.Header.Set(spanIdKey, "span")
carrier, err := Inject(HttpFormat, req.Header)
assert.Nil(t, err)
assert.Equal(t, "trace", carrier.Get(traceIdKey))
assert.Equal(t, "span", carrier.Get(spanIdKey))
_, err = Inject(HttpFormat, req)
assert.Equal(t, ErrInvalidCarrier, err)
}
func TestGrpcPropagator_Extract(t *testing.T) {
md := metadata.New(map[string]string{
traceIdKey: "trace",
spanIdKey: "span",
})
carrier, err := Extract(GrpcFormat, md)
assert.Nil(t, err)
assert.Equal(t, "trace", carrier.Get(traceIdKey))
assert.Equal(t, "span", carrier.Get(spanIdKey))
_, err = Extract(GrpcFormat, 1)
assert.Equal(t, ErrInvalidCarrier, err)
_, err = Extract(nil, 1)
assert.Equal(t, ErrInvalidCarrier, err)
}
func TestGrpcPropagator_Inject(t *testing.T) {
md := metadata.New(map[string]string{
traceIdKey: "trace",
spanIdKey: "span",
})
carrier, err := Inject(GrpcFormat, md)
assert.Nil(t, err)
assert.Equal(t, "trace", carrier.Get(traceIdKey))
assert.Equal(t, "span", carrier.Get(spanIdKey))
_, err = Inject(GrpcFormat, 1)
assert.Equal(t, ErrInvalidCarrier, err)
_, err = Inject(nil, 1)
assert.Equal(t, ErrInvalidCarrier, err)
}

View File

@@ -1,150 +0,0 @@
package trace
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/timex"
"github.com/tal-tech/go-zero/core/trace/tracespec"
)
const (
initSpanId = "0"
clientFlag = "client"
serverFlag = "server"
spanSepRune = '.'
)
var spanSep = string([]byte{spanSepRune})
// A Span is a calling span that connects caller and callee.
type Span struct {
ctx spanContext
serviceName string
operationName string
startTime time.Time
flag string
children int
}
func newServerSpan(carrier Carrier, serviceName, operationName string) tracespec.Trace {
traceId := stringx.TakeWithPriority(func() string {
if carrier != nil {
return carrier.Get(traceIdKey)
}
return ""
}, stringx.RandId)
spanId := stringx.TakeWithPriority(func() string {
if carrier != nil {
return carrier.Get(spanIdKey)
}
return ""
}, func() string {
return initSpanId
})
return &Span{
ctx: spanContext{
traceId: traceId,
spanId: spanId,
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: serverFlag,
}
}
// Finish finishes the calling span.
func (s *Span) Finish() {
}
// Follow follows the tracing service and operation names in context.
func (s *Span) Follow(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
span := &Span{
ctx: spanContext{
traceId: s.ctx.traceId,
spanId: s.followSpanId(),
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: s.flag,
}
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
// Fork forks the tracing service and operation names in context.
func (s *Span) Fork(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
span := &Span{
ctx: spanContext{
traceId: s.ctx.traceId,
spanId: s.forkSpanId(),
},
serviceName: serviceName,
operationName: operationName,
startTime: timex.Time(),
flag: clientFlag,
}
return context.WithValue(ctx, tracespec.TracingKey, span), span
}
// SpanId returns the span id.
func (s *Span) SpanId() string {
return s.ctx.SpanId()
}
// TraceId returns the trace id.
func (s *Span) TraceId() string {
return s.ctx.TraceId()
}
// Visit visits the span using fn.
func (s *Span) Visit(fn func(key, val string) bool) {
s.ctx.Visit(fn)
}
func (s *Span) forkSpanId() string {
s.children++
return fmt.Sprintf("%s.%d", s.ctx.spanId, s.children)
}
func (s *Span) followSpanId() string {
fields := strings.FieldsFunc(s.ctx.spanId, func(r rune) bool {
return r == spanSepRune
})
if len(fields) == 0 {
return s.ctx.spanId
}
last := fields[len(fields)-1]
val, err := strconv.Atoi(last)
if err != nil {
return s.ctx.spanId
}
last = strconv.Itoa(val + 1)
fields[len(fields)-1] = last
return strings.Join(fields, spanSep)
}
// StartClientSpan starts the client span with given context, service and operation names.
func StartClientSpan(ctx context.Context, serviceName, operationName string) (context.Context, tracespec.Trace) {
if span, ok := ctx.Value(tracespec.TracingKey).(*Span); ok {
return span.Fork(ctx, serviceName, operationName)
}
return ctx, emptyNoopSpan
}
// StartServerSpan starts the server span with given context, carrier, service and operation names.
func StartServerSpan(ctx context.Context, carrier Carrier, serviceName, operationName string) (
context.Context, tracespec.Trace) {
span := newServerSpan(carrier, serviceName, operationName)
return context.WithValue(ctx, tracespec.TracingKey, span), span
}

View File

@@ -1,139 +0,0 @@
package trace
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/trace/tracespec"
"google.golang.org/grpc/metadata"
)
func TestClientSpan(t *testing.T) {
span := newServerSpan(nil, "service", "operation")
ctx := context.WithValue(context.Background(), tracespec.TracingKey, span)
ctx, span = StartClientSpan(ctx, "entrance", "operation")
defer span.Finish()
assert.Equal(t, span, ctx.Value(tracespec.TracingKey))
const serviceName = "authorization"
const operationName = "verification"
ctx, childSpan := span.Fork(ctx, serviceName, operationName)
defer childSpan.Finish()
assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey))
assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId())
assert.Equal(t, "0.1.1", getSpan(childSpan).SpanId())
assert.Equal(t, serviceName, childSpan.(*Span).serviceName)
assert.Equal(t, operationName, childSpan.(*Span).operationName)
assert.Equal(t, clientFlag, childSpan.(*Span).flag)
}
func TestClientSpan_WithoutTrace(t *testing.T) {
ctx, span := StartClientSpan(context.Background(), "entrance", "operation")
defer span.Finish()
assert.Equal(t, emptyNoopSpan, span)
assert.Equal(t, context.Background(), ctx)
}
func TestServerSpan(t *testing.T) {
ctx, span := StartServerSpan(context.Background(), nil, "service", "operation")
defer span.Finish()
assert.Equal(t, span, ctx.Value(tracespec.TracingKey))
const serviceName = "authorization"
const operationName = "verification"
ctx, childSpan := span.Fork(ctx, serviceName, operationName)
defer childSpan.Finish()
assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey))
assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId())
assert.Equal(t, "0.1", getSpan(childSpan).SpanId())
assert.Equal(t, serviceName, childSpan.(*Span).serviceName)
assert.Equal(t, operationName, childSpan.(*Span).operationName)
assert.Equal(t, clientFlag, childSpan.(*Span).flag)
}
func TestServerSpan_WithCarrier(t *testing.T) {
md := metadata.New(map[string]string{
traceIdKey: "a",
spanIdKey: "0.1",
})
ctx, span := StartServerSpan(context.Background(), grpcCarrier(md), "service", "operation")
defer span.Finish()
assert.Equal(t, span, ctx.Value(tracespec.TracingKey))
const serviceName = "authorization"
const operationName = "verification"
ctx, childSpan := span.Fork(ctx, serviceName, operationName)
defer childSpan.Finish()
assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey))
assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId())
assert.Equal(t, "0.1.1", getSpan(childSpan).SpanId())
assert.Equal(t, serviceName, childSpan.(*Span).serviceName)
assert.Equal(t, operationName, childSpan.(*Span).operationName)
assert.Equal(t, clientFlag, childSpan.(*Span).flag)
}
func TestSpan_Follow(t *testing.T) {
tests := []struct {
span string
expectSpan string
}{
{
"0.1",
"0.2",
},
{
"0",
"1",
},
{
"a",
"a",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
md := metadata.New(map[string]string{
traceIdKey: "a",
spanIdKey: test.span,
})
ctx, span := StartServerSpan(context.Background(), grpcCarrier(md),
"service", "operation")
defer span.Finish()
assert.Equal(t, span, ctx.Value(tracespec.TracingKey))
const serviceName = "authorization"
const operationName = "verification"
ctx, childSpan := span.Follow(ctx, serviceName, operationName)
defer childSpan.Finish()
assert.Equal(t, childSpan, ctx.Value(tracespec.TracingKey))
assert.Equal(t, getSpan(span).TraceId(), getSpan(childSpan).TraceId())
assert.Equal(t, test.expectSpan, getSpan(childSpan).SpanId())
assert.Equal(t, serviceName, childSpan.(*Span).serviceName)
assert.Equal(t, operationName, childSpan.(*Span).operationName)
assert.Equal(t, span.(*Span).flag, childSpan.(*Span).flag)
})
}
}
func TestSpan_Visit(t *testing.T) {
var run bool
span := newServerSpan(nil, "service", "operation")
span.Visit(func(key, val string) bool {
assert.True(t, len(key) > 0)
assert.True(t, len(val) > 0)
run = true
return true
})
assert.True(t, run)
}
func getSpan(span tracespec.Trace) tracespec.Trace {
return span.(*Span)
}

View File

@@ -1,19 +0,0 @@
package trace
type spanContext struct {
traceId string
spanId string
}
func (sc spanContext) TraceId() string {
return sc.traceId
}
func (sc spanContext) SpanId() string {
return sc.spanId
}
func (sc spanContext) Visit(fn func(key, val string) bool) {
fn(traceIdKey, sc.traceId)
fn(spanIdKey, sc.spanId)
}

56
core/trace/tracer.go Normal file
View File

@@ -0,0 +1,56 @@
package trace
import (
"context"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
)
// assert that metadataSupplier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = new(metadataSupplier)
type metadataSupplier struct {
metadata *metadata.MD
}
func (s *metadataSupplier) Get(key string) string {
values := s.metadata.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}
func (s *metadataSupplier) Set(key, value string) {
s.metadata.Set(key, value)
}
func (s *metadataSupplier) Keys() []string {
out := make([]string, 0, len(*s.metadata))
for key := range *s.metadata {
out = append(out, key)
}
return out
}
// Inject injects the metadata into ctx.
func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) {
p.Inject(ctx, &metadataSupplier{
metadata: metadata,
})
}
// Extract extracts the metadata from ctx.
func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata *metadata.MD) (
baggage.Baggage, sdktrace.SpanContext) {
ctx = p.Extract(ctx, &metadataSupplier{
metadata: metadata,
})
return baggage.FromContext(ctx), sdktrace.SpanContextFromContext(ctx)
}

Some files were not shown because too many files have changed in this diff Show More