有没感觉 Go 的 sync 包不够用?有没遇到类型没有 sync/atomic 支持? 我们一起看看 go-zero 的 syncx 包对标准库的一些增值补充。 https://github.com/tal-tech/go-zero/tree/master/core/s…
name
作用
AtomicBool
bool类型 原子类
AtomicDuration
Duration有关 原子类
AtomicFloat64
float64类型 原子类
Barrier
栏栅【将加锁解锁包装】
Cond
条件变量
DoneChan
优雅通知关闭
ImmutableResource
创建后不会修改的资源
Limit
控制请求数
LockedCalls
确保方法的串行调用
ManagedResource
资源管理
Once
提供
once func
OnceGuard
一次性使用的资源管理
Pool
pool,简单的池
RefResource
引用计数的资源
ResourceManager
资源管理器
SharedCalls
类似
singflight
的功能
SpinLock
自旋锁:自旋+CAS
TimeoutLimit
Limit + timeout 控制
下面开始对以上库组件做分别介绍。
一、Go并发库组件
1.1 atomic
因为没有 泛型 支持,所以才会出现多种类型的原子类支持。以下采用
float64
作为例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (f *AtomicFloat64) Add(val float64) float64 { for { old := f.Load() nv := old + val if f.CompareAndSwap(old, nv) { return nv }
func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool { return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val)) }
func (f *AtomicFloat64) Load() float64 { return math.Float64frombits(atomic.LoadUint64((*uint64)(f))) }
func (f *AtomicFloat64) Set(val float64) { atomic.StoreUint64((*uint64)(f), math.Float64bits(val)) }
``` `Add(val)` :如果 `CAS`
|
失败,不断for循环重试,获取 old val,并set old+val; CompareAndSwap(old, new) :调用底层
; Load() :调用
,然后转换 Set(val) :调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| `atomic.StoreUint64` 至于其他类型,开发者想自己扩展自己想要的类型,可以依照上述,基本上调用原始 操作,然后转换为需要的类型,比如:遇到 `bool` 可以借助 `0, 1` 来分辨对应的 `false, true` 。 #### 1.2 Barrier 这里 `Barrier` 只是将业务函数操作封装,作为闭包传入,内部将 `lock` 操作的加锁解锁自行解决了【防止开发者加锁了忘记解锁】 ```text func (b *Barrier) Guard(fn func()) { b.lock.Lock() defer b.lock.Unlock() // 自己的业务逻辑 fn() }
|
1.3 Cond/Limit/TimeoutLimit
这个数据结构和
Limit
一起组成了
TimeoutLimit
,这里将这3个一起讲:
1 2 3 4
| func NewTimeoutLimit(n int) TimeoutLimit { return TimeoutLimit{ limit: NewLimit(n), cond: NewCond(),
|
}
1 2 3
| func NewLimit(n int) Limit { return Limit{ pool: make(chan lang.PlaceholderType, n),
|
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| `limit` 这里是有缓冲的 `channel` ; `cond` 是无缓冲的; 所以这里结合名字来理解:因为 是限制某一种资源的使用,所以需要预先在资源池中放入预置数量的资源; `Cond` 类似阀门,需要两边都准备好,才能进行数据交换,所以使用无缓冲,同步控制。 这里我们看看 `stores/mongo` 中关于 `session` 的管理,来理解 资源控制: func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) { // 选项参数注入 ... // 看 limit 中是否还能取出资源 if err := cs.limit.Borrow(o.timeout); err != nil { return nil, err } else { return cs.Copy(), nil }
func (l TimeoutLimit) Borrow(timeout time.Duration) error { // 1. 如果还有 limit 中还有资源,取出一个,返回 if l.TryBorrow() { return nil } // 2. 如果 limit 中资源已经用完了 var ok bool for { // 只有 cond 可以取出一个【无缓存,也只有 cond <- 此条才能通过】 timeout, ok = l.cond.WaitWithTimeout(timeout) // 尝试取出一个【上面 cond 通过时,就有一个资源返回了】 // 看 `Return()` if ok && l.TryBorrow() { } // 超时控制 if timeout <= 0 { return ErrTimeout }
func (l TimeoutLimit) Return() error { // 返回去一个资源 if err := l.limit.Return(); err != nil { return err } // 同步通知另一个需要资源的协程【实现了阀门,两方交换】 l.cond.Signal() }
|
1.4 资源管理
同文件夹中还有
ResourceManager
,从名字上类似,这里将两个组件放在一起讲解。
先从结构上:
1 2 3
| type ManagedResource struct { resource interface{}
|
lock sync.RWMutex
1 2 3 4
| generate func() interface{} equals func(a, b interface{}) bool
|
}
1 2 3
| type ResourceManager struct { // 资源:这里看得出来是 I/O, resources map[string]io.Closer
|
sharedCalls SharedCalls
lock sync.RWMutex
}
`然后来看获取资源的方法签名:```sql
1
| func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)
|
1 2 3 4
| func (mr *ManagedResource) Take() interface{}
func (mr *ManagedResource) MarkBroken(resource interface{})
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| 使用 `SharedCalls` 做防重复请求,并将资源缓存在内部的 `sourMap` ;另外传入的 `CREATE func` 和 `IO` 操作有关,常见用在网络资源的缓存; `ManagedResource` 缓存资源没有 `map` 而是单一的 `interface` ,说明只有一份,但是它提供了 `Take()` 和传入 `generate()` 说明可以让开发者自行更新 `resource` ; 所以在用途上: :用在网络资源的管理。如:数据库连接管理; :用在一些变化资源,可以做资源前后对比,达到更新资源。如: `token` 管理和验证 #### 1.5 RefResource 这个就和 `GC` 中引用计数类似: `Use() -> ref++` ```java Clean() -> ref--; if ref == 0 -> ref clean
|
1 2 3 4 5 6
| func (r *RefResource) Use() error { // 互斥访问 r.lock.Lock() defer r.lock.Unlock() // 清除标记 if r.cleaned {
|
return ErrUseOfCleaned
}
r.ref++
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #### 1.6 SharedCalls 一句话形容:使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用,其他请求"坐享其成",这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。 这个组件被反复应用在其他组件中,上面说的 。 类似当需要高频并发访问一个资源时,就可以使用 缓存。 // 当多个请求同时使用Do方法请求资源时 func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // 先申请加锁 g.lock.Lock()
// 根据key,获取对应的call结果,并用变量c保存 if c, ok := g.calls[key]; ok { // 拿到call以后,释放锁,此处call可能还没有实际数据,只是一个空的内存占位 g.lock.Unlock() // 调用wg.Wait,判断是否有其他goroutine正在申请资源,如果阻塞,说明有其他goroutine正在获取资源 c.wg.Wait() // 当wg.Wait不再阻塞,表示资源获取已经结束,可以直接返回结果 return c.val, c.err }
// 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,可以保证只有一个goroutine可以调用makecall c := g.makeCall(key, fn) // 返回调用结果 }
|
1.7 总结
不重复造轮子,一直是 go-zero 设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。
1.8 项目地址
https://github.com/tal-tech/go-zero
https://gitee.com/kevwan/go-zero
欢迎使用 go-zero 并 star 支持我们!
本文标题: 更简的并发代码更
发布时间: 2019年05月05日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/38ebac1b/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!