Go编程之sync包
在并发编程中同步通常说的锁的主要作用是保证多个线程或者 goroutine在访问同一片内存时不会出现混乱的问题。Go语言的sync包提供了常见的并发编程原语,主要有 Mutex、RWMutex、WaitGroup、Once、Pool、Map 和 Cond 等。我们来看下如何使用这些原语。
sync.Mutex
Mutex是sync包中使用比较广泛的原语。它允许在共享资源上互斥访问(不能同时访问):
package main import ( "fmt" "sync" "time" ) // SafeCounter 的并发使用是安全的。 type SafeCounter struct { v map[string]int mux sync.Mutex } // Inc 增加给定 key 的计数器的值。 func (c *SafeCounter) Inc(key string) { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v c.v[key]++ c.mux.Unlock() } // Value 返回给定 key 的计数器的当前值。 func (c *SafeCounter) Value(key string) int { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.mux.Unlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) fmt.Println(c.Value("somekey")) }
sync.RWMutex
sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.Mutex的Lock和UnLock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:
package main import ( "sync" "fmt" "strconv" ) var r sync.RWMutex type m map[string]string func (c *m) Get(k string) string { defer r.RUnlock() r.RLock() if v, ok := (*c)[k]; ok { return v } return "Unknown" } func (c *m) Set(k, v string) { defer r.Unlock() r.Lock() (*c)[k] = v } func New() *m { return &m{} } func main() { s := New() for i:=0;i<10;i++ { s.Set(strconv.Itoa(i), strconv.Itoa(i)) } fmt.Println(s) }
sync.RWMutex允许至少一个读锁或一个写锁存在,而sync.Mutex允许一个读锁或一个写锁存在。
sync.RWMutex读锁的速度比锁定/解锁sync.Mutex更快,另一方面,在sync.RWMutex上调用Lock()/ Unlock()是比较慢的。
因此,只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex。
sync.WaitGroup
sync.WaitGroup 用于goroutine等待另一个goroutine执行完成。
sync.WaitGroup 拥有一个内部计数器。当计数器等于0时,则Wait()方法会立即返回。否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止。
要增加计数器使用Add(int)方法。要减少它可以使用Done()(将计数器减1),也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的。
package main import ( "sync" "fmt" ) func main() { var wg sync.WaitGroup for i:=1; i<10; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Println("sync go is i=", i) }(i) } wg.Wait() }
每次创建goroutine时,都会使用wg.Add(1)来增加wg的内部计数器。也可以在for循环之前调用wg.Add(10)。
与此同时,每个goroutine完成时,都会使用wg.Done()减少wg的内部计数器。
main 会在goroutine都执行完wg.Done()将计数器变为0后退出。
sync.Pool
sync.Pool是一个并发池,负责安全地保存和复用临时一组对象,减少内存分配,降低 GC 压力。它有两个方法:
Get() interface{} 用来从并发池中取出元素。
Put(interface{}) 将一个对象加入并发池。
package main import ( "sync" "fmt" ) type Student struct { Name string Age int } func main() { var s sync.Pool k := new(Student) k.Name = "ywfuns" k.Age = 18 s.Put(k) p := s.Get().(*Student) fmt.Println(p) }
Get()方法会从并发池中随机取出对象,无法保证以固定的顺序获取并发池中存储的对象。
pool := &sync.Pool{ New: func() interface{} { return new(Student) }, }
这样每次调用Get()时,将返回由在pool.New中指定的函数创建的对象(本例中为指针)。
一般有两种情况下使用:
当我们须重用共享的和长期存在的对象(如:数据库连接)时。
用于优化内存分配。
sync.Once
sync.Once 是 Go 标准库提供的使函数只执行一次的实现,常应用于单例模式,例如初始化配置、保持数据库连接等。作用与 init 函数类似,但有区别。
init 函数是当所在的 package 首次被加载时执行,若未被使用,则既浪费了内存,又延长了程序加载时间。
sync.Once 可以在代码的任意位置初始化和调用,因此可以延迟到使用时再执行,并发场景下是线程安全的。
在多数情况下,sync.Once 被用于控制变量的初始化,这个变量的读写满足如下三个条件:
当且仅当第一次访问某个变量时,进行初始化(写);
变量初始化过程中,所有读都被阻塞,直到初始化完成;
变量仅初始化一次,初始化完成后驻留在内存里。
ReadConfig 需要读取环境变量,并转换为对应的配置。环境变量在程序执行前已经确定,执行过程中不会发生改变。ReadConfig 可能会被多个协程并发调用,为了提升性能(减少执行时间和内存占用),使用 sync.Once 是一个比较好的方式。
package main import ( "sync" "os" "strconv" "log" "time" ) type Config struct { Server string Port int64 } var ( once sync.Once config *Config ) func ReadConfig() *Config { once.Do(func() { var err error config = &Config{Server: os.Getenv("SERVER_URL")} config.Port, err = strconv.ParseInt(os.Getenv("PORT"), 10, 0) if err != nil { config.Port = 8080 } log.Println("init config") }) return config } func main() { for i := 0; i < 10; i++ { go func() { _ = ReadConfig() }() } time.Sleep(time.Second) }
sync.Cond
sync.Cond 条件变量用来协调想要访问共享资源的 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。
sync.Cond 基于互斥锁/读写锁,互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。
sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
package main import ( "sync" "log" "time" ) var done = false func read(name string, c *sync.Cond) { c.L.Lock() for !done { c.Wait() } log.Println(name, "starts reading") c.L.Unlock() } func write(name string, c *sync.Cond) { log.Println(name, "starts writing") time.Sleep(time.Second) c.L.Lock() done = true c.L.Unlock() log.Println(name, "wakes all") c.Broadcast() } func main() { cond := sync.NewCond(&sync.Mutex{}) go read("reader1", cond) go read("reader2", cond) go read("reader3", cond) write("writer", cond) time.Sleep(time.Second * 3) }
done 即互斥锁需要保护的条件变量。
read() 调用 Wait() 等待通知,直到 done 为 true。
write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
writer 接收数据花费了 1s,同步通知所有等待的协程。