Go编程之sync包

Admin 2022-05-23 18:58:27 GoLang

在并发编程中同步通常说的锁的主要作用是保证多个线程或者 goroutine在访问同一片内存时不会出现混乱的问题。Go语言的sync包提供了常见的并发编程原语,主要有 Mutex、RWMutex、WaitGroup、Once、Pool、Map 和 Cond 等。我们来看下如何使用这些原语。

sync.Mutex

Mutex是sync包中使用比较广泛的原语。它允许在共享资源上互斥访问(不能同时访问):

package main

import (
    "fmt"
    "sync"
    "time"
)

// SafeCounter 的并发使用是安全的。
type SafeCounter struct {
    v   map[string]int
    mux sync.Mutex
}

// Inc 增加给定 key 的计数器的值。
func (c *SafeCounter) Inc(key string) {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    c.v[key]++
    c.mux.Unlock()
}

// Value 返回给定 key 的计数器的当前值。
func (c *SafeCounter) Value(key string) int {
    c.mux.Lock()
    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v
    defer c.mux.Unlock()
    return c.v[key]
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    for i := 0; i < 1000; i++ {
        go c.Inc("somekey")
    }

    time.Sleep(time.Second)
    fmt.Println(c.Value("somekey"))
}

sync.RWMutex

sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.Mutex的Lock和UnLock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:

package main

import (
    "sync"
    "fmt"
    "strconv"
)

var r sync.RWMutex

type m map[string]string

func (c *m) Get(k string) string {
    defer r.RUnlock()
    r.RLock()

    if v, ok := (*c)[k]; ok {
        return v
    }
    return "Unknown"
}

func (c *m) Set(k, v string)  {
    defer r.Unlock()
    r.Lock()
    (*c)[k] = v
}

func New() *m {
    return &m{}
}

func main() {
    s := New()

    for i:=0;i<10;i++ {
        s.Set(strconv.Itoa(i), strconv.Itoa(i))
    }
    
    fmt.Println(s)
}

sync.RWMutex允许至少一个读锁或一个写锁存在,而sync.Mutex允许一个读锁或一个写锁存在。

sync.RWMutex读锁的速度比锁定/解锁sync.Mutex更快,另一方面,在sync.RWMutex上调用Lock()/ Unlock()是比较慢的。

因此,只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex。

sync.WaitGroup

sync.WaitGroup 用于goroutine等待另一个goroutine执行完成。

sync.WaitGroup 拥有一个内部计数器。当计数器等于0时,则Wait()方法会立即返回。否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止。

要增加计数器使用Add(int)方法。要减少它可以使用Done()(将计数器减1),也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的。

package main

import (
    "sync"
    "fmt"
)

func main()  {
    var wg sync.WaitGroup

    for i:=1; i<10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()

            fmt.Println("sync go is i=", i)
        }(i)
    }
    wg.Wait()
}

每次创建goroutine时,都会使用wg.Add(1)来增加wg的内部计数器。也可以在for循环之前调用wg.Add(10)。

与此同时,每个goroutine完成时,都会使用wg.Done()减少wg的内部计数器。

main 会在goroutine都执行完wg.Done()将计数器变为0后退出。

sync.Pool

sync.Pool是一个并发池,负责安全地保存和复用临时一组对象,减少内存分配,降低 GC 压力。它有两个方法:

  1. Get() interface{} 用来从并发池中取出元素。

  2. Put(interface{}) 将一个对象加入并发池。

package main

import (
    "sync"
    "fmt"
)

type Student struct {
    Name string
    Age  int
}

func main()  {
    var s sync.Pool

    k := new(Student)
    k.Name = "ywfuns"
    k.Age = 18

    s.Put(k)

    p := s.Get().(*Student)
    fmt.Println(p)
}

Get()方法会从并发池中随机取出对象,无法保证以固定的顺序获取并发池中存储的对象。

pool := &sync.Pool{
  New: func() interface{} {
    return new(Student)
  },
}

这样每次调用Get()时,将返回由在pool.New中指定的函数创建的对象(本例中为指针)。

一般有两种情况下使用:

  1. 当我们须重用共享的和长期存在的对象(如:数据库连接)时。

  2. 用于优化内存分配。

sync.Once

sync.Once 是 Go 标准库提供的使函数只执行一次的实现,常应用于单例模式,例如初始化配置、保持数据库连接等。作用与 init 函数类似,但有区别。

init 函数是当所在的 package 首次被加载时执行,若未被使用,则既浪费了内存,又延长了程序加载时间。

sync.Once 可以在代码的任意位置初始化和调用,因此可以延迟到使用时再执行,并发场景下是线程安全的。

在多数情况下,sync.Once 被用于控制变量的初始化,这个变量的读写满足如下三个条件:

  1. 当且仅当第一次访问某个变量时,进行初始化(写);

  2. 变量初始化过程中,所有读都被阻塞,直到初始化完成;

  3. 变量仅初始化一次,初始化完成后驻留在内存里。

ReadConfig 需要读取环境变量,并转换为对应的配置。环境变量在程序执行前已经确定,执行过程中不会发生改变。ReadConfig 可能会被多个协程并发调用,为了提升性能(减少执行时间和内存占用),使用 sync.Once 是一个比较好的方式。

package main

import (
    "sync"
    "os"
    "strconv"
    "log"
    "time"
)

type Config struct {
    Server string
    Port   int64
}

var (
    once   sync.Once
    config *Config
)

func ReadConfig() *Config {
    once.Do(func() {
        var err error
        config = &Config{Server: os.Getenv("SERVER_URL")}
        config.Port, err = strconv.ParseInt(os.Getenv("PORT"), 10, 0)
        if err != nil {
            config.Port = 8080
        }
        log.Println("init config")
    })
    return config
}

func main() {
    for i := 0; i < 10; i++ {
        go func() {
            _ = ReadConfig()
        }()
    }
    time.Sleep(time.Second)
}

sync.Cond 

sync.Cond 条件变量用来协调想要访问共享资源的 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。

sync.Cond 基于互斥锁/读写锁,互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。

sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。

package main

import (
    "sync"
    "log"
    "time"
)

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    log.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    log.Println(name, "starts writing")
    time.Sleep(time.Second)
    c.L.Lock()
    done = true
    c.L.Unlock()
    log.Println(name, "wakes all")
    c.Broadcast()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})

    go read("reader1", cond)
    go read("reader2", cond)
    go read("reader3", cond)
    write("writer", cond)

    time.Sleep(time.Second * 3)
}
  • done 即互斥锁需要保护的条件变量。

  • read() 调用 Wait() 等待通知,直到 done 为 true。

  • write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。

  • write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。

writer 接收数据花费了 1s,同步通知所有等待的协程。

相关文章
最新推荐