Go并发编程之Goroutine和Channel(3)

Admin 2022-05-18 09:31:14 GoLang

一、前言概述

我们都知道软件运行的最小单位是进程,当一个应用程序启动时操作系统为其创建了一个进程,代码运行的最小单位是线程,我们写的代码片段在程序跑起来后一定是在一个线程中运行的,而这个线程是属于这个进程创建的。

我们常接触到的并发模型是多线程并发模型,而 Go 语言中的并发模型是 CSP 并发模型,简单介绍这两种并发模型:

1、多线程并发模型

多线程并发模型是在一个应用程序中同时存在多个执行流,这多个执行流通过内存共享、信号量、锁等方式进行通信,CPU 在多个线程间进行上下文切换,从而达到并发执行,提高 CPU 利用率,其本质是内核态线程和用户态线程是一对一的关系。

2、CSP并发模型

CSP 并发模型是将程序的执行和通信划分开来(Process 和 Channel),Process 代表了执行任务的一个单元,Channel 用来在多个单元之间进行数据交互共享,Process 内部之间没有并发问题,所有由通信带来的并发问题都被压缩在 Channel 中,使得聚合在一起得到了约束、同步、竞争聚焦在 Channel 上,Go 就是基于这种并发模型的。Go 在线程的基础上实现了一套并发模型(MPG),线程之上虚拟出了协程的概念,一个协程代表一个 Process, 但在操作系统级别调度的基本单位依然是线程,只是 Go 自己实现了一个调度器,用来管理协程的调度,M(Machine)代表一个内核线程,P(Process)代表一个调度器,G (Goroutine) 代表一个协程,其本质是内核线程和用户态线程成了多对多的关系。

二、Goroutine和Channel

Goroutine 单纯地将函数并发执行意义并不大,函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的Goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说Goroutine是Go程序并发的执行体,Channel就是它们之间的连接。Channel是可以让一个Goroutine发送特定值到另一个Goroutine的通信机制。

Go语言中的通道(Channel)是一种特殊的类型。通道是一个传送带或者队列,总是遵循先入先出(First In First Out, FIFO)的原则,保证收发数据的顺序。每一个通道都是一个具体类型的管道,也就是声明Channel的时候需要为其指定元素类型。

package main

import "fmt"

func main()  {
    // 通道是引用类型,空值是nil
    // 声明通道后需要使用make函数初始化后才能使用
    var ch chan int

    // make(chan 元素类型, [缓冲大小]),缓冲大小是可选的
    ch = make(chan int)

    fmt.Printf("v:%v type:%T\n", ch, ch)
}

三、Channel操作

Channel有发送(send)、接收(receive)和关闭(close)三种操作,发送和接收都使用 <-(操作符)。

发送:将一个值发送到通道中,如:ch <- 10

接收:从一个通道中接收值,如:

a := <- ch 从ch中接收值,并赋值给a

<- ch 从ch中接收值,忽略结果

a,ok := <- ch  带状态接收值(如果ch关闭ok返回false)

关闭:关闭通道close(ch)

  1. 注意:

    1. 只有在通知接收方Goroutine所有的数据都发送完毕的时候,才需要关闭通道。

    2. 通道是可被垃圾回收机制回收的,与关闭文件不一样文件操作结束后必须要关闭的,但通道不是必须关闭的。

  2. 关闭后的通道有以下特点:

    1. 对一个关闭的通道再发送值会导致panic。

    2. 对一个关闭的通道进行接收值,会一直获取值直到通道为空。

    3. 对一个关闭的并且没有值得通道执行接收操作,会得到对应类型的零值。

    4. 关闭一个已经关闭的通道会导致panic。

四、无缓冲Channel

无缓冲Channel称为阻塞通道,无缓冲通道必须在发送数据的同时有人接收值,否则会阻塞在那里,直到报错(只发送值不接收值的时候会出现deadlock错误,即发送和接收动作是同时发生的)。

package main

import "fmt"

func main(){
    var wg sync.WaitGroup
    
    wg.Add(1)
    go func(){
        defer wg.Done()
        fmt.Println("协程")
    }()
    fmt.Println("主协程")
    
    // 等待新的协程运行完毕,程序在退出
    wg.Wait()
}
package main

import "fmt"

func main()  {
    ch := make(chan int)

    go func(c chan int) {
        for {
            // 读取
            i := <- ch
            fmt.Println("read ch i is:", i)
        }
    }(ch)

    // 写入
    ch <- 10

    fmt.Printf("v:%v type:%T\n", ch, ch)
}

无缓冲通道上的发送操作会阻塞,直到有一个Goroutine在该通道上执行接收操作,这时才能发送成功,两个Goroutine将继续执行。

如果接收操作限制性,接收方的Goroutine将会阻塞,直到另一个Goroutine在该通道上发送一个值。

使用无缓冲通道进行通信,将会导致发送和接收的Goroutine同步化。因此无缓冲通道也被称为同步通道。

五、有缓冲的Channel

使用make函数初始化通道的时候为其制定通道的容量,只要通道的容量大于零,就是有缓冲的通道,通道的容量表示通道中能存放的元素的数量(PS:缓冲 channel 类似一个队列,只有队列满了才可能发送阻塞)。

可以使用len()获取通道内元素的数量,使用cap函数获取通道的容量。

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 1) 

    ch <- 10

    fmt.Println("len(ch):", len(ch))
    fmt.Println("cap(ch)", cap(ch))
}

六、关闭Channel

当向通道中发送完数据时,可以使用close函数关闭通道。

当通道被关闭后,再往通道发送值会引发panic,从通道里接收值一直都是类型的空值。

如何判断一个通道是否被关闭了?

方法一:

    i, ok := <- ch 通道关闭后ok=false

方法二:

    for range遍历通道,通道被关闭时就会退出for range

package main

import "fmt"

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        for i:=0; i<10; i++ {
            ch2 <- i
        }
        close(ch1)
    }()

    
    go func() {
        for{
            i, ok := <- ch1
            if !ok {
                break
            }
            ch2 <- i*i
        }
        close(ch2)
    }()


    // 通道关闭后退出for range循环
    for j := range ch2 {
        fmt.Println(j)
    }
}

七、单向通道

有时候我们会将通道作为参数在多个函数间传递,很多时候我们在不同的函数中使用通道都会对其进行限制,如限制通道在函数中只能发送或只能接收。

chan <- int:一个只写单向通道(只能写入int类型值到通道),不能执行接收操作。

<- chan int:一个只读单向通道(只能从通道读取int类型值),不能执行发送操作。

在函数传参及任何赋值操作中,可以将双向通道转换为单向通道,但反过来是不可以的。

package main

import "fmt"

func add(out chan <- int)  {
    for i:=0; i<10; i++ {
        out <- i
    }
    close(out)
}

func squarer(out chan <- int, in <- chan int)  {
    for i:= range in {
        out <- i*i
    }
    close(out)
}

func print(in <- chan int)  {
    for i:= range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go add(ch1)
    go squarer(ch2, ch1)

    print(ch2)
}

八、异常总结

Channelnil非空没满
接收阻塞
接收值阻塞接收值接收值
发送阻塞发送值发送值阻塞发送值
关闭panic返回零值返回零值返回零值返回零值


相关文章
最新推荐