Go语言中的并发是什么

69次阅读
没有评论

共计 7575 个字符,预计需要花费 19 分钟才能阅读完成。

这篇文章主要介绍“Go 语言中的并发是什么”,在日常操作中,相信很多人在 Go 语言中的并发是什么问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Go 语言中的并发是什么”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

Go 语言之并发

Go 语言直接支持内置支持并发。当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。

Go 语言运行时的调度器是一个复杂的软件, 这个调度器在操作系统之上。操作系统的线程与语言运行时的逻辑处理器绑定, 并在逻辑处理器上运行 goroutine。

Go 语言的并发同步逻辑来自一个叫做通信顺讯进程 (CSP) 的范型。CSP 是一种消息传递模型, 通过在 goroutine 之间传递数据来传递消息,而不是通过对数据进行加锁来实现同步访问。这种数据的类型叫做通道(channel)。

并发与并行

在操作系统中, 一个应用程序就可以看作一个进程, 而每个进程至少包含一个线程。每个进程的初始线程被称为主线程。

操作系统会在物理处理器 (CPU) 上调度线程来运行, 而 Go 语言会在逻辑处理器来调度 goroutine 来运行。1.5 版本之上,Go 语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器。1.5 之前, 默认给整个应用程序只分配一个逻辑处理器。

如下图, 在运行时把 goroutine 调度到逻辑处理器上运行,逻辑处理器绑定到唯一的操作系统线程。

当 goroutine 执行了一个阻塞的系统调用 (就是一个非纯 CPU 的任务) 时, 调度器会将这个线程与处理器分离, 并创建一个新线程来运行这个处理器上提供的服务。

语言运行默认限制每个程序最多创建 10000 个线程。

注意并发≠并行!并行需要至少 2 个逻辑处理器。

goroutine

以并发的形式分别显示大写和小写的英文字母

package main

import (

fmt

runtime

sync

func main() {

// 分配一个逻辑处理器给调度器使用

runtime.GOMAXPROCS(1)

// wg 用来等待程序完成

var wg sync.WaitGroup

// 计数器加 2,表示要等待两个 goroutine

wg.Add(2)

fmt.Println(Start!)

// 声明一个匿名函数, 并创建一个 goroutime

go func() {

// 通知 main 函数工作已经完成

defer wg.Done()

21: // 显示字母表 3 次

22: for count:=0; count count++ {

23: for char:= a char a +26;char++ {

24: fmt.Printf(%c , char)

25: }

26: }

27: }()

28: // 同上

29: go func() {

30: // 通知 main 函数工作已经完成

31: defer wg.Done()

32: // 显示字母表 3 次

33: for count:=0; count count++ {

34: for char:= A char A +26;char++ {

35: fmt.Printf(%c , char)

36: }

37: }

38: }()

39: // 等待 goroutine 结束

40: fmt.Println(Waiting!)

41: wg.Wait()

42: fmt.Println(\nFinish!)

43: }

运行结果后,可以看到先输出的是所有的大写字母, 最后才是小写字母。是因为第一个 goroutine 完成所有显示需要花时间太短了,以至于在调度器切换到第二个 goroutine 之前,就完成了所有任务。

调度器为了防止某个 goroutine 长时间占用逻辑处理器,会停止当前正运行的 goroutine,运行其他可运行的 goroutine 运行的机会。

创建两个相同的长时间才能完成其工作的 goroutine 就可以看到,比如说显示 5000 以内的素数值。

代码结构如下

1: go printPrime(A)

2: go printPrime(B)

3:

4: func printPrime(prefix string) {

5: …

6: }

结果类似

1: B:2

2: B:3

3: …

4: B:4591

5: A:3

6: A:5

7: …

8: A:4561

9: A:4567

10: B:4603

11: B:4621

12: …

13: // Completed B

14: A:4457

15: …

16: // Completed A

如何修改逻辑处理器的数量

1: runtime.GOMAXPROCS(runtime.NUMCPU())

稍微改动下上面的代码,结果就会大不同

1: package main

2:

3: import (

4: fmt

5: runtime

6: sync

7:

8:

9: func main() {

10: // 分配两个逻辑处理器给调度器使用

11: runtime.GOMAXPROCS(2)

12: // wg 用来等待程序完成

13: var wg sync.WaitGroup

14: // 计数器加 2,表示要等待两个 goroutine

15: wg.Add(2)

16: fmt.Println(Start!)

17: // 声明一个匿名函数, 并创建一个 goroutime

18: go func() {

19: // 通知 main 函数工作已经完成

20: defer wg.Done()

21: // 显示字母表 3 次

22: for count:=0; count count++ {

23: for char:= a char a +26;char++ {

24: fmt.Printf(%c , char)

25: }

26: }

27: }()

28: // 同上

29: go func() {

30: // 通知 main 函数工作已经完成

31: defer wg.Done()

32: // 显示字母表 3 次

33: for count:=0; count count++ {

34: for char:= A char A +26;char++ {

35: fmt.Printf(%c , char)

36: }

37: }

38: }()

39: // 等待 goroutine 结束

40: fmt.Println(Waiting!)

41: wg.Wait()

42: fmt.Println(\nFinish!)

43: }

结果类似下面的(根据 CPU 单核的性能结果可能结果稍微不一样)

1: Start!

2: Waiting!

3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g

4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z

5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s

6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X

7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q

8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z

9: Finish!

可以发现,goroutine 是并行运行的。

只有在有多个逻辑处理器且可以同时让每个 goroutine 运行在一个可用的物理处理器上的时候,goroutine 才会并行运行。

竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并且试图同时读和写这个资源, 就处于相互竞争的状态。

在竞争状态,每个 goroutine 都会覆盖另一个 goroutine 的工作。这种覆盖发生在 goroutine 发生切换的时候。

每个 goroutien 都会创造自己的共享变量副本。当切换到领另一个 goroutine 时,如果这个变量的值在上一个 goroutine 发生改变,这个 goroutine 再次运行时,虽然变量的值改变了,但是由于这个 goroutine 没有更新自己的那个副本的值,而是继续使用,并且将其存回变量的值,从而覆盖上一个 goroutine 的工作。

go build –race 用来竞争检测器标志来编译程序

锁住共享资源原子函数

原子函数能够以底层的枷锁机制来同步访问整型变量和指针。省略部分代码如下:

1: var counter int64

2: go incCounter(1)

3: go incCounter(2)

4: func incCounter(id int) {

5: for count:=0;count count++{

6: // 安全地对 counter 加 1

7: atomic.AddInt64(counter, 1)

8: // 当前 goroutine 从线程退出,并放回队列

9: runtime.Gosched()

10: }

11: }

使用 atmoi 包的 AddInt64 函数。这些 goroutine 都会自动根据所引用的变量做同步处理。

另外两个原子函数是 LoadInt64 和 StoreInt64。用法如下:

1: // shutdown 是通知正在执行的 goroutine 停止工作的标志

2: var shutdown int64

3: var wg sync.WaitGroup

4: // 该停止工作了, 安全地设置 shutdown 标志

5: atomic.StoreInt64(shutdown, 1)

6: // 等待 goroutine 结束

7: wg.Wait()

8: // 检测是否停止工作,如果 shutdown== 1 那么 goroutine 就会终止

9: if atomic.LoadInt64(shutdown) == 1 {

10: break

11: }

12:

互斥锁

另一种同步访问共享资源的方式是互斥锁。主要代码如下:

1: var (

2: // counter 是所有 goroutine 都要增加其值的变量

3: counter int

4: wg sync.WaitGroup

5: // mutex 用来定义一段代码临界区

6: mutex sync.Mutex

7: )

8: func main…

9: // 业务代码

10: func incCounter(id int) {

11: defer wg.Done()

12: for i:=0;i i++ {

13: // 同一时期只允许一个 goroutine 进入

14: mutex.Lock()

15: // 大括号并不是必须的

16: {

17: // 捕获 counter 的值

18: value := counter

19: // 当前 goroutine 从线程退出, 并返回到队列

20: runtime.Gosched()

21: // 增加本地 value 变量的值

22: value++

23: // 将该值保存回 counter

24: counter = value

25: }

26: // 释放锁, 允许其他正在等待的 goroutine

27: mutex.Unlock()

28: }

29: }

通道

通道在 goroutine 之间架起了一个管道, 并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。

可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针。

go 语言需要使用 make 来创建一个通道,chan 是关键字:

1: // 无缓冲的整型通道

2: unbuffered := make(chan int)

3: // 有缓冲的字符串通道

4: buffered := make(chan string, 10)

向通道发送值

1: buffered := make(chan string, 10)

2: // 通通道发送一个字符串

3: buffered – Gopher

4: // 从通道接收一个字符串

5: value := -buffered

无缓冲的通道是指在接收前没有能力保存任何值的通道。发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果没有准备好, 通道会导致 goroutine 阻塞等待。所以无缓冲通道保证了 goroutine 之间同一时间进行数据交换。

1: // 四个 goroutine 间的接力比赛

2: package main

3:

4: import (

5: fmt

6: sync

7: time

8: )

9

10: var wg sync.WaitGroup

11:

12: func main()  {

13: // 创建一个无缓冲的通道

14: baton := make(chan int)

15: wg.Add(1)

16: // 第一步跑步者持有接力棒

17: go Runner(baton)

18: // 开始比赛

19: baon – 1

20: // 等待比赛结束

21: wg.Wait()

22: }

23:

24: // Ruuner 模拟接力比赛中的一位跑步者

25: func Runner(baton chan int) {

26: var newRunner int

27: // 等待接力棒

28: runner := -baton

29: // 开始跑步

30: fmt.Printf(运动员 %d 带着 Baton 跑 \n , runner)

31: // 创建下一步跑步者

32: if runner != 4{

33: newRunner = runner + 1

34: fmt.Printf(运动员 %d 上线 \n , newRunner)

35: go Runner(baton)

36: }

37: // 围绕跑到跑

38: time.Sleep(100 * time.Millisecond)

39: // 比赛结束了吗?

40: if runner == 4{

41: fmt.Printf(运动员 %d 完成, 比赛结束 \n , runner)

42: wg.Done()

43: return

44: }

45: // 将接力棒交给下一位跑步者

46: fmt.Printf(运动员 %d 与运动员 %d 交换 \n , runner, newRunner)

47: baton – newRunner

48: }

结果:

1: 运动员 1 带着 Baton 跑

2: 运动员 2 上线

3: 运动员 1 与运动员 2 交换

4: 运动员 2 带着 Baton 跑

5: 运动员 3 上线

6: 运动员 2 与运动员 3 交换

7: 运动员 3 带着 Baton 跑

8: 运动员 4 上线

9: 运动员 3 与运动员 4 交换

10: 运动员 4 带着 Baton 跑

11: 运动员 4 完成, 比赛结束

有缓冲的通道则能在接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道没有可用缓冲区或者没有要接收的值时, 发送或者接收才会阻塞。

1: package main

2:

3: import (

4: fmt

5: math/rand

6: sync

7: time

8: )

9:

10: const (

11: // goroutine 的数量

12: numberGoroutines = 4

13: // 工作的数量

14: taskLoad = 10

15: )

16:

17: var wg sync.WaitGroup

18:

19: // 初始化随机数种子

20: func init() {

21: rand.Seed(time.Now().Unix())

22: }

23: func main() {

24: // 创建一个有缓冲的通道来管理工作

25: tasks := make(chan string, taskLoad)

26: wg.Add(numberGoroutines)

27: // 增加一组要完成的工作

28: for post:=1;post taskLoad;post++ {

29: tasks – fmt.Sprintf(Task:%d , post)

30: }

31: // 启动 goroutine 来处理工作

32: for i:=1;i numberGoroutines+1;i++ {

33: go worker(tasks, i)

34: }

35: // 有工作处理完时关闭通道

36: closetasks)

37:

38: wg.Wait()

39: fmt.Printf(all finished!)

40:

41: }

42:

43: func worker(tasks chan string, worker_id int) {

44: defer wg.Done()

45:

46: for {

47: // 等待分配工作

48: task, ok := -tasks

49: if !ok {

50: // 通道变空

51: fmt.Printf(Worker%d shut down\n , worker_id)

52: return

53: }

54: // 开始工作

55: fmt.Printf(Worker%d start %s\n , worker_id, task)

56:

57: // 随机等待一段时间

58: sleep := rand.Int63n(100)

59: time.Sleep(time.Duration(sleep)*time.Millisecond)

60: // 显示完成了工作

61: fmt.Printf(Worker%d Completed %s\n , worker_id, task)

62: }

63:

输出结果:

1: Worker4 start Task:1

2: Worker1 start Task:2

3: Worker2 start Task:3

4: Worker3 start Task:4

5: Worker3 Completed Task:4

6: Worker3 start Task:5

7: Worker4 Completed Task:1

8: Worker4 start Task:6

9: Worker2 Completed Task:3

10: Worker2 start Task:7

11: Worker3 Completed Task:5

12: Worker3 start Task:8

13: Worker Completed Task:7

14: Worker2 start Task:9

15: Worker3 Completed Task:8

16: Worker3 shut down

17: Worker4 Completed Task:6

18: Worker4 shut down

19: Worker1 Completed Task:2

20: Worker1 shut down

21: Worker2 Completed Task:9

22: Worker2 shut down

23: all inished!

由于程序和 Go 语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

当通道关闭后,goroutine 依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

到此,关于“Go 语言中的并发是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-01发表,共计7575字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)