Go语言源码sync.WaitGroup解读,优雅的协程并发同步原语

WaitGroup 的含义其实我们可以通过的名称去认识它,wait 就是阻塞等待,group 就是一组协程, 也就是阻塞等待一组 goroutine 的集合执行完成。主 goroutine 调用 Add() 添加以设置要等待的goroutine的数量。 然后每个 goroutines 运行并在完成后调用 Done()。 同时,等待可用于阻塞,直到所有goroutine完成。

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

当 noCopy 被嵌入到其他结构体时,说明该结构体是不能够被复制使用的,虽然说当我们复制时不会提示报错,编译也没有的问题。但可以使用 go vet 可以检测避免这种错误。

State1 被分为三部分,

  1. 操作计数器,Add(1) 加1,Done() 减1
  2. 阻塞的协程数
  3. 用于等待和唤醒的信号量

Add() 操作

func (wg *WaitGroup) Add(delta int) {
  // return state1
    statep, semap := wg.state()

    state := atomic.AddUint64(statep, uint64(delta)<<32)
  // 高地址存储计数器 低地址存储阻塞协程数
    v := int32(state >> 32)
    w := uint32(state)

  // 正常情况下,计数器 v == 0 时,就会调用wait()退出
  // 异常情况 v 小于 0,panic
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
  // 这块是 当第一次Add() 与 wait() 并发操作时
  // 当 wait() 先执行,因为刚开始计数器为0 所以先执行wait()不会阻塞 
  // 当 add(1) 先执行,此时计数器大于0,紧跟着后面的wait()就会阻塞住
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
  // 计数器操作成功 return
    if v > 0 || w == 0 {
        return
    }
  // 此时 v == 0 && w > 0
    // 现在不可能有并发的状态变化
  // Add() 不可能和 wait() 同时发生
  // 当计数器为0的时候 ,wait()数量不会增加
    // 还是用最小成本检查了下waitgroup的滥用
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // v==0
  // 重置等待者数量为0,一次发信号
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Done() 操作

Done() 操作实际就是Add(-1)。

// 计数器减一
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Wait()

// 阻塞直到计数器为0
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()

    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
    // 计数器为0 return
        if v == 0 {
            return
        }
        // 阻塞的协程数加一
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
      // 阻塞直到被唤醒
            runtime_Semacquire(semap)
      // 当被唤醒后 正常情况计数器应该为0
      // 出现此种情况是应为又调用了Add() 操作不当
      // panic
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注