WaitGroup 的含义其实我们可以通过的名称去认识它,wait 就是阻塞等待,group 就是一组协程, 也就是阻塞等待一组 goroutine 的集合执行完成。主 goroutine 调用 Add() 添加以设置要等待的goroutine的数量。 然后每个 goroutines 运行并在完成后调用 Done()。 同时,等待可用于阻塞,直到所有goroutine完成。
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
当 noCopy 被嵌入到其他结构体时,说明该结构体是不能够被复制使用的,虽然说当我们复制时不会提示报错,编译也没有的问题。但可以使用 go vet 可以检测避免这种错误。
State1 被分为三部分,
- 操作计数器,Add(1) 加1,Done() 减1
- 阻塞的协程数
- 用于等待和唤醒的信号量
Add() 操作
func (wg *WaitGroup) Add(delta int) {
// return state1
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 高地址存储计数器 低地址存储阻塞协程数
v := int32(state >> 32)
w := uint32(state)
// 正常情况下,计数器 v == 0 时,就会调用wait()退出
// 异常情况 v 小于 0,panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 这块是 当第一次Add() 与 wait() 并发操作时
// 当 wait() 先执行,因为刚开始计数器为0 所以先执行wait()不会阻塞
// 当 add(1) 先执行,此时计数器大于0,紧跟着后面的wait()就会阻塞住
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 计数器操作成功 return
if v > 0 || w == 0 {
return
}
// 此时 v == 0 && w > 0
// 现在不可能有并发的状态变化
// Add() 不可能和 wait() 同时发生
// 当计数器为0的时候 ,wait()数量不会增加
// 还是用最小成本检查了下waitgroup的滥用
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v==0
// 重置等待者数量为0,一次发信号
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Done() 操作
Done() 操作实际就是Add(-1)。
// 计数器减一
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait()
// 阻塞直到计数器为0
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 计数器为0 return
if v == 0 {
return
}
// 阻塞的协程数加一
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞直到被唤醒
runtime_Semacquire(semap)
// 当被唤醒后 正常情况计数器应该为0
// 出现此种情况是应为又调用了Add() 操作不当
// panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}