在并发编程中同步通常说的锁的主要作用是保证多个线程或者 goroutine在访问同一片内存时不会出现混乱的问题。Go语言的sync包提供了常见的并发编程原语,主要有 Mutex、RWMutex、WaitGroup、Once、Pool、Map 和 Cond 等。我们来看下如何使用这些原语。
sync.Mutex
Mutex是sync包中使用比较广泛的原语。它允许在共享资源上互斥访问(不能同时访问):
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}
// Inc 增加给定 key 的计数器的值。
func (c *SafeCounter) Inc(key string) {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
c.v[key]++
c.mux.Unlock()
}
// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter) Value(key string) int {
c.mux.Lock()
// Lock 之后同一时刻只有一个 goroutine 能访问 c.v
defer c.mux.Unlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}
sync.RWMutex
sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.Mutex的Lock和UnLock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:
package main
import (
"sync"
"fmt"
"strconv"
)
var r sync.RWMutex
type m map[string]string
func (c *m) Get(k string) string {
defer r.RUnlock()
r.RLock()
if v, ok := (*c)[k]; ok {
return v
}
return "Unknown"
}
func (c *m) Set(k, v string) {
defer r.Unlock()
r.Lock()
(*c)[k] = v
}
func New() *m {
return &m{}
}
func main() {
s := New()
for i:=0;i<10;i++ {
s.Set(strconv.Itoa(i), strconv.Itoa(i))
}
fmt.Println(s)
}
sync.RWMutex允许至少一个读锁或一个写锁存在,而sync.Mutex允许一个读锁或一个写锁存在。
sync.RWMutex读锁的速度比锁定/解锁sync.Mutex更快,另一方面,在sync.RWMutex上调用Lock()/ Unlock()是比较慢的。
因此,只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex。
sync.WaitGroup
sync.WaitGroup 用于goroutine等待另一个goroutine执行完成。
sync.WaitGroup 拥有一个内部计数器。当计数器等于0时,则Wait()方法会立即返回。否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止。
要增加计数器使用Add(int)方法。要减少它可以使用Done()(将计数器减1),也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的。
package main
import (
"sync"
"fmt"
)
func main() {
var wg sync.WaitGroup
for i:=1; i<10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println("sync go is i=", i)
}(i)
}
wg.Wait()
}
每次创建goroutine时,都会使用wg.Add(1)来增加wg的内部计数器。也可以在for循环之前调用wg.Add(10)。
与此同时,每个goroutine完成时,都会使用wg.Done()减少wg的内部计数器。
main 会在goroutine都执行完wg.Done()将计数器变为0后退出。
sync.Pool
sync.Pool是一个并发池,负责安全地保存和复用临时一组对象,减少内存分配,降低 GC 压力。它有两个方法:
Get() interface{} 用来从并发池中取出元素。
Put(interface{}) 将一个对象加入并发池。
package main
import (
"sync"
"fmt"
)
type Student struct {
Name string
Age int
}
func main() {
var s sync.Pool
k := new(Student)
k.Name = "ywfuns"
k.Age = 18
s.Put(k)
p := s.Get().(*Student)
fmt.Println(p)
}
Get()方法会从并发池中随机取出对象,无法保证以固定的顺序获取并发池中存储的对象。
pool := &sync.Pool{
New: func() interface{} {
return new(Student)
},
}
这样每次调用Get()时,将返回由在pool.New中指定的函数创建的对象(本例中为指针)。
一般有两种情况下使用:
当我们须重用共享的和长期存在的对象(如:数据库连接)时。
用于优化内存分配。
sync.Once
sync.Once 是 Go 标准库提供的使函数只执行一次的实现,常应用于单例模式,例如初始化配置、保持数据库连接等。作用与 init 函数类似,但有区别。
init 函数是当所在的 package 首次被加载时执行,若未被使用,则既浪费了内存,又延长了程序加载时间。
sync.Once 可以在代码的任意位置初始化和调用,因此可以延迟到使用时再执行,并发场景下是线程安全的。
在多数情况下,sync.Once 被用于控制变量的初始化,这个变量的读写满足如下三个条件:
当且仅当第一次访问某个变量时,进行初始化(写);
变量初始化过程中,所有读都被阻塞,直到初始化完成;
变量仅初始化一次,初始化完成后驻留在内存里。
ReadConfig 需要读取环境变量,并转换为对应的配置。环境变量在程序执行前已经确定,执行过程中不会发生改变。ReadConfig 可能会被多个协程并发调用,为了提升性能(减少执行时间和内存占用),使用 sync.Once 是一个比较好的方式。
package main
import (
"sync"
"os"
"strconv"
"log"
"time"
)
type Config struct {
Server string
Port int64
}
var (
once sync.Once
config *Config
)
func ReadConfig() *Config {
once.Do(func() {
var err error
config = &Config{Server: os.Getenv("SERVER_URL")}
config.Port, err = strconv.ParseInt(os.Getenv("PORT"), 10, 0)
if err != nil {
config.Port = 8080
}
log.Println("init config")
})
return config
}
func main() {
for i := 0; i < 10; i++ {
go func() {
_ = ReadConfig()
}()
}
time.Sleep(time.Second)
}
sync.Cond
sync.Cond 条件变量用来协调想要访问共享资源的 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。
sync.Cond 基于互斥锁/读写锁,互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。
sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
package main
import (
"sync"
"log"
"time"
)
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
done 即互斥锁需要保护的条件变量。
read() 调用 Wait() 等待通知,直到 done 为 true。
write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。
writer 接收数据花费了 1s,同步通知所有等待的协程。