有没感觉 Go 的 sync 包不够用?有没遇到类型没有 sync/atomic 支持? 我们一起看看 go-zero 的 syncx 包对标准库的一些增值补充。 https://github.com/tal-tech/go-zero/tree/master/core/s

name
作用

AtomicBool
bool类型 原子类

AtomicDuration
Duration有关 原子类

AtomicFloat64
float64类型 原子类

Barrier
栏栅【将加锁解锁包装】

Cond
条件变量

DoneChan
优雅通知关闭

ImmutableResource
创建后不会修改的资源

Limit
控制请求数

LockedCalls
确保方法的串行调用

ManagedResource
资源管理

Once
提供

1
once func

OnceGuard
一次性使用的资源管理

Pool
pool,简单的池

RefResource
引用计数的资源

ResourceManager
资源管理器

SharedCalls
类似

1
singflight

的功能

SpinLock
自旋锁:自旋+CAS

TimeoutLimit
Limit + timeout 控制

下面开始对以上库组件做分别介绍。

atomic

因为没有 泛型 支持,所以才会出现多种类型的原子类支持。以下采用

1
float64

作为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  func (f *AtomicFloat64) Add(val float64) float64 {
for {
old := f.Load()
nv := old + val
if f.CompareAndSwap(old, nv) {
return nv
}
}
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {
return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}

1
Add(val)

:如果

1
CAS

失败,不断for循环重试,获取 old val,并set old+val;

1
CompareAndSwap(old, new)

:调用底层

1
atomic


1
CAS

1
Load()

:调用

1
atomic.LoadUint64

,然后转换

1
Set(val)

:调用

1
atomic.StoreUint64

至于其他类型,开发者想自己扩展自己想要的类型,可以依照上述,基本上调用原始

1
atomic

操作,然后转换为需要的类型,比如:遇到
1
bool

可以借助
1
0, 1

来分辨对应的
1
false, true

Barrier

这里

1
Barrier

只是将业务函数操作封装,作为闭包传入,内部将
1
lock

操作的加锁解锁自行解决了【防止开发者加锁了忘记解锁】

1
2
3
4
5
6
7
  func (b *Barrier) Guard(fn func()) {
b.lock.Lock()
defer b.lock.Unlock()
// 自己的业务逻辑
fn()
}

Cond/Limit/TimeoutLimit

这个数据结构和

1
Limit

一起组成了
1
TimeoutLimit

,这里将这3个一起讲:

1
2
3
4
5
6
7
8
9
10
11
12
13
  func NewTimeoutLimit(n int) TimeoutLimit {
return TimeoutLimit{
limit: NewLimit(n),
cond: NewCond(),
}
}

func NewLimit(n int) Limit {
return Limit{
pool: make(chan lang.PlaceholderType, n),
}
}

1
limit

这里是有缓冲的

1
channel

1
cond

是无缓冲的;

所以这里结合名字来理解:因为

1
Limit

是限制某一种资源的使用,所以需要预先在资源池中放入预置数量的资源;
1
Cond

类似阀门,需要两边都准备好,才能进行数据交换,所以使用无缓冲,同步控制。
这里我们看看
1
stores/mongo

中关于
1
session

的管理,来理解 资源控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
// 选项参数注入
...
// 看 limit 中是否还能取出资源
if err := cs.limit.Borrow(o.timeout); err != nil {
return nil, err
} else {
return cs.Copy(), nil
}
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
// 1. 如果还有 limit 中还有资源,取出一个,返回
if l.TryBorrow() {
return nil
}
// 2. 如果 limit 中资源已经用完了
var ok bool
for {
// 只有 cond 可以取出一个【无缓存,也只有 cond <- 此条才能通过】
timeout, ok = l.cond.WaitWithTimeout(timeout)
// 尝试取出一个【上面 cond 通过时,就有一个资源返回了】
// 看 `Return()`
if ok && l.TryBorrow() {
return nil
}
// 超时控制
if timeout <= 0 {
return ErrTimeout
}
}
}

func (l TimeoutLimit) Return() error {
// 返回去一个资源
if err := l.limit.Return(); err != nil {
return err
}
// 同步通知另一个需要资源的协程【实现了阀门,两方交换】
l.cond.Signal()
return nil
}

资源管理

同文件夹中还有

1
ResourceManager

,从名字上类似,这里将两个组件放在一起讲解。
先从结构上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  type ManagedResource struct {
// 资��
resource interface{}
lock sync.RWMutex
// 生成资源的逻辑,由开发者自己控制
generate func() interface{}
// 对比资源
equals func(a, b interface{}) bool
}

type ResourceManager struct {
// 资源:这里看得出来是 I/O,
resources map[string]io.Closer
sharedCalls SharedCalls
// 对资源map互斥访问
lock sync.RWMutex
}

然后来看获取资源的方法签名:

1
2
3
4
5
6
7
  func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 获取一个资源(有就直接获取,没有生成一个)
func (mr *ManagedResource) Take() interface{}
// 判断这个资源是否不符合传入的判断要求,不符合则重置
func (mr *ManagedResource) MarkBroken(resource interface{})

1
ResourceManager

使用

1
SharedCalls

做防重复请求,并将资源缓存在内部的
1
sourMap

;另外传入的
1
create func


1
IO

操作有关,常见用在网络资源的缓存;

1
ManagedResource

缓存资源没有

1
map

而是单一的
1
interface

,说明只有一份,但是它提供了
1
Take()

和传入
1
generate()

说明可以让开发者自行更新
1
resource

所以在用途上:

1
ResourceManager

:用在网络资源的管理。如:数据库连接管理;

1
ManagedResource

:用在一些变化资源,可以做资源前后对比,达到更新资源。如:

1
token

管理和验证

RefResource

这个就和

1
GC

中引用计数类似:

1
Use() -> ref++
1
Clean() -> ref--; if ref == 0 -> ref clean
1
2
3
4
5
6
7
8
9
10
11
12
13
  func (r *RefResource) Use() error {
// 互斥访问
r.lock.Lock()
defer r.lock.Unlock()
// 清除标记
if r.cleaned {
return ErrUseOfCleaned
}
// 引用 +1
r.ref++
return nil
}

SharedCalls

一句话形容:使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用,其他请求”坐享其成”,这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。
这个组件被反复应用在其他组件中,上面说的

1
ResourceManager


类似当需要高频并发访问一个资源时,就可以使用
1
SharedCalls

缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  // 当多个请求同时使用Do方法请求资源时
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
// 先申请加锁
g.lock.Lock()

// 根据key,获取对应的call结果,并用变量c保存
if c, ok := g.calls[key]; ok {
// 拿到call以后,释放锁,此处call可能还没有实际数据,只是一个空的内存占位
g.lock.Unlock()
// 调用wg.Wait,判断是否有其他goroutine正在申请资源,如果阻塞,说明有其他goroutine正在获取资源
c.wg.Wait()
// 当wg.Wait不再阻塞,表示资源获取已经结束,可以直接返回结果
return c.val, c.err
}

// 没有拿到结果,则调用makeCall方法去获取资源,注意此处仍然是锁住的,可以保证只有一个goroutine可以调用makecall
c := g.makeCall(key, fn)
// 返回调用结果
return c.val, c.err
}

总结

不重复造轮子,一直是

1
go-zero

设计主旨之一;也同时将平时业务沉淀到组件中,这才是框架和组件的意义。
关于
1
go-zero

更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。

项目地址

https://github.com/tal-tech/go-zero
https://gitee.com/kevwan/go-zero
欢迎使用 go-zero 并 star 支持我们!

微信交流群

关注『微服务实践』公众号并回复 进群 获取社区群二维码。

本文标题: 更简的并发代码,更强的并发控制

本文作者: OSChina

发布时间: 2021年04月15日 09:46

最后更新: 2025年04月03日 11:07

原始链接: https://haoxiang.eu.org/cce88833/

版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!

× 喜欢就赞赏一下呗!
打赏二维码