Go语言源码sync.Map解读,一种高性能并发安全的字典类型

首先来看一个例子,在我们刚开始对写 GO 的代码的过程一定遇到这个错误。通过阅读 Map源码实现知道map是不支持线程安全的,所以一般并发场景下都是 加锁来解决,相对的话性能上就会有一定的损耗。

package main

import "time"

func main() {
    m := make(map[int]string)
    m[1] = `test`
    go func() {
        for {
            m[1] = `test`
        }
    }()
    go func() {
        for {
            _ = m[1]
        }
    }()
    time.Sleep(time.Second)
}
fatal error: concurrent map read and map write
goroutine 6 [running]:
runtime.throw(0x107c19c, 0x21)
.....

官方在 go1.9 的引入 sync.Map 为并发map提供了解决方案。

下面通过一个例子,来看看 sync.Map 的使用:

func main() {
    var m sync.Map
    // 新增
    m.Store(1, `add`)
    // 获取
    if value, ok := m.Load(1); ok {
        fmt.Println(value.(string))
    }
    // 遍历
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value)
        return true
    })
    // 删除
    m.Delete(1)
}

来看看 sync.Map 结构

type Map struct { 
    mu Mutex
    // read map的 k v(entry) 是不变的,删除只是打标记,插入新key会加锁写到dirty中
    // 因此对read的读取无需加锁
    read atomic.Value // readOnly
    // 读写需要加锁
    dirty map[interface{}]*entry
     // 当load操作在 read.m 中未找到,尝试从dirty中进行 load 时(不管是否存在),misses++
    // 当misses >= dirty map len时,dirty被提升为read 并且重新分配dirty == nil
    misses int
}

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // 如果是 true 的话说明 dirty 里面有些 key 在 m 是没有的。
}

type entry struct {
    // p  指向 *interface{}
    // 如果 p == nil, 说明 kv 已经被删除并且 m.dirty == nil
    // 如果 p == expunged, 说明 kv 已经删除,并且 m.dirty != nil, 这个 key 在 dirty 中未找到
    // 其他情况 p 是 interface{}地址,m.dirty != nil 的话说明 read.m[key] 和 dirty[key] 指向一个 *entry
    p unsafe.Pointer // *interface{}
}
graph TB;

    subgraph read.m
    key1[key1]
    key2[key2]
    end

    subgraph dirty
    key_a[key1]
    key_b
    end
    key1-->entry[*entry]
    key_a-->entry

当 read.m 和 dirty 中都有key1时,因为 *entry 为指针类型,所以一处修改都会受到影响 。

Load 查找

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly) // 1
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly) // 2
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // m.misses++
            // 当 miss 数量满足一定条件时,会触发dirty 提升为 read
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

获取的话这块比较简单,先是在未加锁的情况下从 read 中查找,找到立即返回 ,反之加锁再从 dirty 中查找,并且 miss 数递增,当 miss 数量满足一定条件时,也就是miss > len(dirty),会触发dirty 提升为 read。不太容易理解可能的就是 1 2 中为什么两次从 read 获取数据?这里其实就是为了避免加锁时,dirty 提升为 read。

Store 新增

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    // 如果 read 有的话,直接尝试修改 read.m[key] = value
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // 如果 read 有这个 key,并且 p == expunged, 说明 m.dirty != nil 并且dirty里面没有这个 key 
            // 1. 设置 p = nil
             // 2. dirty 中新增 key
            // 3. 让dirty[key] 和 read.m[key] 指向同一个 *entry
            m.dirty[key] = e
        }
         // e.p = &value
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
         // read 中没有,dirty中有,直接修改
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // 新增的key 在dirty 中有, 但是read中没有,标记下
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }
    // m.dirty == nil的话,将read里面不是expunged kv 复制到dirty
    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryStore(i *interface{}) bool {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
    }
}

Delete 删除元素

func (m *Map) Delete(key interface{}) {
    m.LoadAndDelete(key)
}

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if ok {
        return e.delete()
    }
    return nil, false
}

// 删除的话,不是直接删除kv,而是让 e.p == nil
func (e *entry) delete() (value interface{}, ok bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return nil, false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return *(*interface{})(p), true
        }
    }
}

Range 遍历读取

func (m *Map) Range(f func(key, value interface{}) bool) {
    // 如果所有的 key 都在read 中是不用加锁的
    read, _ := m.read.Load().(readOnly)
    if read.amended {
        // m.dirty包含一些不在read.m中的kv,遍历的时候就会触发触发dirty 提升为 read
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }

    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

其他还有一些像 LoadAndDelete LoadOrStore 等方法,就是上述代码的整合操作,这里就不做解释了。

通过阅读学习源码和官方文档可以了解 sync.Map 的设计特性,整体设计上其实并不复杂。里面一些好的设计思路值得我们在平时编码过程中借鉴应用的。

  • 以空间换效率,通过read和dirty两个map来提高读取效率
  • 优先从read中读取(无锁),否则再从dirty map中读取(加锁)
  • 动态调整,当misses次数过多时,将dirty提升为read
  • 延迟删除,删除只是为value打一个标记,在dirty map提升时才执行真正的删除

下面是map操作非常详细的图解,图片搭配代码更好理解代码,map图解

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注