0%

Golang Worker Pool練習-1

優化Worker Pool的奇技淫巧

圖片來源https://itnext.io/explain-to-me-go-concurrency-worker-pool-pattern-like-im-five-e5f1be71

題目說明

設計一個隨機生成亂數,計算亂數每一位數字總和(e.g. 69 => 6+9=15)。並請開啟一個Goroutine生成亂數,24個Goroutine計算亂數每一位數字總和。Bonus 五秒後關閉所有Goroutine。

詳見題目來源

優化結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main

import (
"fmt"
"time"
"math/rand"
"context"
"sync"
"runtime"
)

type Job struct {
Num int64
}

type ResJob struct {
JobPtr *Job
Sum int64
}

func Generate(ctx context.Context, jobChan chan<- *Job) {
for {
select {
case <-ctx.Done():
fmt.Println("Genereate done")
return
default:
num := rand.Int63()
job := &Job{Num: num}
jobChan <- job
}
}
}

func Calculate(ctx context.Context, jobChan <-chan *Job, resChan chan<- *ResJob) {
for {
select {
case <-ctx.Done():
fmt.Println("Calculate done")
return
default:
job := <-jobChan
n := job.Num
sum := int64(0)
for n > 0 {
sum += n % 10
n = n / 10
}

res := &ResJob{
JobPtr: job,
Sum: sum,
}
resChan <- res
}
}
}

func main() {
defer fmt.Println("NumGoroutine: ", runtime.NumGoroutine())

duration := time.Now().Add(time.Second * 5)
ctx, cancel := context.WithDeadline(context.Background(), duration)
defer cancel()

var jobChan = make(chan *Job, 100)
var resChan = make(chan *ResJob, 100)
var wg sync.WaitGroup

go Generate(ctx, jobChan)

numCalculators := 24
wg.Add(numCalculators)
for i := 0; i < numCalculators; i++ {
go func() {
defer wg.Done()
Calculate(ctx, jobChan, resChan)
}()
}

go func() {
wg.Wait()
close(resChan)
}()

for {
select {
case <-ctx.Done():
fmt.Println("Context done")
for {
_, ok := <-resChan
if !ok {
fmt.Println("ResChan closed")
break
}
}

return
case job := <-resChan:
fmt.Printf("%v's the result is: %v\n", job.JobPtr.Num, job.Sum)
}
}
}

優化思路

  1. 使用context上下文控制goroutine狀態
    題目bonus思路規定5秒後關閉。
1
2
3
duration := time.Now().Add(time.Second * 5)
ctx, cancel := context.WithDeadline(context.Background(), duration)
defer cancel()
  1. 使用sync.WaitGroup確保所有goroutine退出前完成所有工作
  • 加入close(resChan)可以優化以下項目:
    • 節省資源:關閉通道可以釋放相關資源。當不再需要使用通道時,關閉它可以讓垃圾回收機制回收相關資源。
    • 通知接收者:確保不會再有resJob被送入resChan。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
numCalculators := 24
wg.Add(numCalculators)

for i := 0; i < numCalculators; i++ {
go func() {
defer wg.Done()
Calculate(ctx, jobChan, resChan)
}()
}

go func() {
wg.Wait()
close(resChan)
}()
  1. 接收resChan加入select機制判斷
    如若使用原先for job := range resChan 會造成邊關閉goroutine同時還在透由channel獲取資料,有概率會deadlock。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for {
select {
case <-ctx.Done():
fmt.Println("Context done")
for {
_, ok := <-resChan
if !ok {
fmt.Println("ResChan closed")
break
}
}

return
case job := <-resChan:
fmt.Printf("%v's the result is: %v\n", job.JobPtr.Num, job.Sum)
}
}
  1. 兩個goroutine func加入select及context
    新增select判讀context是否傳遞關閉訊息並跳出迴圈。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
select {
case <-ctx.Done():
fmt.Println("Calculate done")
return
default:
job := <-jobChan
n := job.num
sum := int64(0)
for n > 0 {
sum += n % 10
n = n / 10
}

res := &ResJob{
JobPtr: job,
Sum: sum,
}
resChan <- res
}

心得

身為一位Gopher Goroutine及Channel是必須非常熟悉的項目,透由練習優化Worker Pool可以加深自己對Golang的認識。參考的題目是沒有設定停止機制,無限循環生成亂數及計算每位數字總和;這邊優化方向為增加停止機制並使所有Goroutine能正常關閉。可以讓剛入門GO語言的朋友參考。

  • 版權聲明: 本網誌所有文章除特別聲明外,均採用 BY-NC-SA 許可協議。轉載請註明出處!