首先来看一个例子,在我们刚开始对写 GO
的代码的过程一定遇到这个错误。通过阅读 Map源码实现知道map是不支持线程安全的,所以一般并发场景下都是 加锁
来解决,相对的话性能上就会有一定的损耗。
package main
import "time"
func main() {
m := make(map[int]string)
m[1] = `test`
go func() {
for {
m[1] = `test`
}
}()
go func() {
for {
_ = m[1]
}
}()
time.Sleep(time.Second)
}
fatal error: concurrent map read and map write
goroutine 6 [running]:
runtime.throw(0x107c19c, 0x21)
.....
官方在 go1.9
的引入 sync.Map 为并发map提供了解决方案。
下面通过一个例子,来看看 sync.Map 的使用:
func main() {
var m sync.Map
// 新增
m.Store(1, `add`)
// 获取
if value, ok := m.Load(1); ok {
fmt.Println(value.(string))
}
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
// 删除
m.Delete(1)
}
来看看 sync.Map 结构
type Map struct {
mu Mutex
// read map的 k v(entry) 是不变的,删除只是打标记,插入新key会加锁写到dirty中
// 因此对read的读取无需加锁
read atomic.Value // readOnly
// 读写需要加锁
dirty map[interface{}]*entry
// 当load操作在 read.m 中未找到,尝试从dirty中进行 load 时(不管是否存在),misses++
// 当misses >= dirty map len时,dirty被提升为read 并且重新分配dirty == nil
misses int
}
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果是 true 的话说明 dirty 里面有些 key 在 m 是没有的。
}
type entry struct {
// p 指向 *interface{}
// 如果 p == nil, 说明 kv 已经被删除并且 m.dirty == nil
// 如果 p == expunged, 说明 kv 已经删除,并且 m.dirty != nil, 这个 key 在 dirty 中未找到
// 其他情况 p 是 interface{}地址,m.dirty != nil 的话说明 read.m[key] 和 dirty[key] 指向一个 *entry
p unsafe.Pointer // *interface{}
}
graph TB;
subgraph read.m
key1[key1]
key2[key2]
end
subgraph dirty
key_a[key1]
key_b
end
key1-->entry[*entry]
key_a-->entry
当 read.m 和 dirty 中都有key1时,因为 *entry
为指针类型,所以一处修改都会受到影响 。
Load 查找
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly) // 1
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly) // 2
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// m.misses++
// 当 miss 数量满足一定条件时,会触发dirty 提升为 read
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
获取的话这块比较简单,先是在未加锁的情况下从 read
中查找,找到立即返回 ,反之加锁再从 dirty
中查找,并且 miss 数递增,当 miss 数量满足一定条件时,也就是miss > len(dirty),会触发dirty 提升为 read。不太容易理解可能的就是 1
2
中为什么两次从 read
获取数据?这里其实就是为了避免加锁时,dirty 提升为 read。
Store 新增
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
// 如果 read 有的话,直接尝试修改 read.m[key] = value
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 read 有这个 key,并且 p == expunged, 说明 m.dirty != nil 并且dirty里面没有这个 key
// 1. 设置 p = nil
// 2. dirty 中新增 key
// 3. 让dirty[key] 和 read.m[key] 指向同一个 *entry
m.dirty[key] = e
}
// e.p = &value
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// read 中没有,dirty中有,直接修改
e.storeLocked(&value)
} else {
if !read.amended {
// 新增的key 在dirty 中有, 但是read中没有,标记下
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
// m.dirty == nil的话,将read里面不是expunged kv 复制到dirty
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
Delete 删除元素
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
// 删除的话,不是直接删除kv,而是让 e.p == nil
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
Range 遍历读取
func (m *Map) Range(f func(key, value interface{}) bool) {
// 如果所有的 key 都在read 中是不用加锁的
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty包含一些不在read.m中的kv,遍历的时候就会触发触发dirty 提升为 read
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
其他还有一些像 LoadAndDelete
LoadOrStore
等方法,就是上述代码的整合操作,这里就不做解释了。
通过阅读学习源码和官方文档可以了解 sync.Map 的设计特性,整体设计上其实并不复杂。里面一些好的设计思路值得我们在平时编码过程中借鉴应用的。
- 以空间换效率,通过read和dirty两个map来提高读取效率
- 优先从read中读取(无锁),否则再从dirty map中读取(加锁)
- 动态调整,当misses次数过多时,将dirty提升为read
- 延迟删除,删除只是为value打一个标记,在dirty map提升时才执行真正的删除
下面是map操作非常详细的图解,图片搭配代码更好理解代码,map图解