5.ゴルーチン - iruma-tea/go_programming GitHub Wiki
並行処理のプログラムを簡単に書けることもGoの特徴の一つです。
このLessonでは、Goの並列処理で重要なゴルーチンについて、基本的な例について説明。
並行処理のプログラムについて学ぶ。
ゴルーチンは、Goにおいて並行処理を行うための軽量のスレッドです。
並行処理のプログラムは、他の言語ではマルチプロセスやマルチスレッド、イベント駆動などとも呼ばれる。
Goでは並行処理を暗黙的に実行してくれるので、他言語のように深い知識を必要とせず、並行処理のプログラムが書ける特徴がある。
ゴルーチンの作り方、sync.WaitGroupを使った並行処理の制御も合わせて説明。
- goroutineで並行処理を行うコードを書いていきましよう。string型の引数sを持つ。normal関数を作る。
- normal関数をコピーして関数名をgoroutineにして同じ処理の関数を作成する。
- 呼び出す関数の前にgoをつけると、並行処理実行される。
- このように、goを前に書くだけで並行処理が実行できるのがゴルーチンです。
func normal(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func goroutine(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go goroutine("world")
normal("hello")
}
// 実行結果
// world
// hello
// hello
// world
// world
// hello
// hello
// world
// hello
プログラムが途中で終了することを避けるには、sync.WaitGroupを使う。
- main関数で「var wg sync.WaiteGroup」と宣言し、「wg.Add(1)」と書いて並行処理が1つあるということを伝える。
- goroutine関数の引数に「wg *sync.WaiteGroup」と書いて並行処理と書いてwgのアドレスを渡す。
- goroutine関数の最後に「wg.Done()」と書く
- main関数内に「wg.Wait()」とかくと、「wg.Done()」が実行されるまで待機する。
- 実行するとmain関数でtime.Sleepをしなくても、「hello」と「world」がすべて表示される。
func goroutine(s string, wg *sync.WaitGroup) {
for i := 0; i < 5; i++ {
// time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
wg.Done()
}
func normal(s string) {
for i := 0; i < 5; i++ {
// time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go goroutine("world"), &wg)
normal("hello")
wg.Wait()
}
// 実行結果
// hello
// world
// world
// hello
// world
// hello
// hello
// world
// world
// hello
WaitGroupの注意点
- 「wg.Done()」コメントアウトして実行すると途中で止まってエラーとなる。
- main関数の「wg.Add(1)」で並行処理を1つ待っているのに、「wg.Done()」が呼ばれないというエラーとなる。
- 「wg.Add(1)」 をも一つ書き加え実行すると、「wg.Add(1)」を二つ待っている状態で、「wg.Done()」を一つしか呼んでいないのでエラーとなる。
- 「wg.Add(1)」 をしたら、「wg.Done()」を忘れずに実行する必要がある「wg.Done()」の実行を忘れないように、できればdeferを使って、先に書いておく。
func goroutine(s string, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
fmt.Println(s)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go goroutine("world", &wg)
normal("hello")
wg.Wait()
}
並行で実行したゴルーチンと値の受け渡しをしたい場合、関数やメソッドのようにreturnで値を返すという方法でやりとりすることができない
ゴルーチンの間では、代わりにChannel(チャネル)を使ってデータを受け渡す。
ゴルーチン間のデータのやり取りに用いるChannelについて説明。
なお、main関数もゴルーチンによって動いている。main関数を動かすゴルーチンをメインゴルーチンという。
-
チャネルを使ったデータのやり取り
- main関数でint型のスライスを作る
-
make関数でチャネルを作る
- 「c := make(chan int)」とすると、int型のチャネル(chan)が変数cに代入される。
- 作ったチャネルをゴルーチンの引数に渡す。
- チャネルでゴルーチンから値を受け取って変数xに格納するため、「**<-演算子」を使って「x := <-c」とかく
func goroutine1(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum
}
func main() {
s := []int{1, 2, 3, 4, 5}
c := make(chan int)
go goroutine1(s, c)
x := <-c
fmt.Println(x)
}
// 実行結果
// 15
-
複数のゴルーチンとのチャネル
- 追加で関数を作成し、main関数で受信するデータを1つ増やしてみる
- 追加する関数は分かりやすいように、goroutine1関数と同じにし、関数名だけgroutine2と書き換える。
- goroutine1関数からは「x := <-c」、goroutine2関数からは「y := <-c」でそれぞれデータを受信するため、チャネルはキューのような先入先出(FIFO)のデータ構造を持つとわかりやすい
func goroutine1(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum
}
func goroutine2(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum
}
func main() {
s := []int{1, 2, 3, 4, 5}
c := make(chan int)
go goroutine1(s, c)
go goroutine2(s, c)
x := <-c
fmt.Println(x)
y := <-c
fmt.Println(y)
}
// 実行結果
// 15
// 15
-
同じ関数のゴルーチンを複数回呼び出す際のチャネル
- goroutine1関数のゴルーチンを複数実行し、それぞれの値をチャネルで受け取ってみましょう
- このようなコードは、1つの処理を何回か実行したいときなどに使う。
func goroutine1(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum
}
func main() {
s := []int{1, 2, 3, 4, 5}
c := make(chan int)
go goroutine1(s, c)
go goroutine1(s, c)
x := <-c
fmt.Println(x)
y := <-c
fmt.Println(y)
}
// 実行結果
// 15
// 15
-
unbuffered channel(バッファなしチャネル)
- バッファ(チャネルに入る値の数)を指定せずに作ったチャネル
-
buffered channel(バッファありチャネル)
- バッファを指定して作ったチャネル
func main() {
ch := make(chan int, 2)
ch <- 100
fmt.Println(len(ch))
ch <- 200
fmt.Println(len(ch))
}
// 実行結果
// 1
// 2
- バッファが2つのチャネルに、3つめの値を入れるとエラーが発生する。
func main() {
ch := make(chan int, 2)
ch <- 100
fmt.Println(len(ch))
ch <- 200
fmt.Println(len(ch))
ch <- 300
fmt.Println(len(ch))
}
// 実行結果
// 1
// 2
// fatal error: all goroutines are asleep - deadlock!
//
// goroutine 1 [chan send]:
// main.main()
- エラーを回避するために、「ch <- 300」でチャネルを追加する前に、「x := <-ch」で受信して値を取り出す。
func main() {
ch := make(chan int, 2)
ch <- 100
fmt.Println(len(ch))
ch <- 200
fmt.Println(len(ch))
x := <-ch
fmt.Println(x)
fmt.Println(len(ch))
ch <-300
fmt.Println(len(ch))
}
// 実行結果
// 1
// 2
// 100
// 1
// 2
- goroutine1関数におけるチャネルへの送信処理「c <- sum」をfor文の中に書き、ループごとにチャネルに値を送信する。
- main関数ではgo goroutine1より、「for i := range c {fmt.Println(i)}」と書く
- ただしrangeはチャネルから値が送られてくるのを待ち続けるので、チャネルをこれ以上値を送信しないことを伝えるため、closeする必要がある。
func goroutine1(s []int, c chan int) {
sum := 0
for _, v := range s {
sum + v
c <- sum
}
close(c)
}
func main() {
s := []int{1, 2, 3, 4, 5}
c := make(chan int)
go goroutine1(s, c)
for i := range c {
fmt.Println(c)
}
}
// 実行結果
// 1
// 3
// 6
// 10
// 15
- バッファありチャネルからrangeで値を取り出す
func main() {
ch := make(chan int, 2)
ch <- 100
fmt.Println(len(ch))
ch <- 200
fmtPrintln(len(ch))
close(ch)
for c := range ch {
fmt.Println(c)
}
}
// 実行結果
// 1
// 2
// 100
// 200
プログラムにおいて、値を送信する関数などの処理Producer、受信する処理をConsumerと呼ぶことがある。
Goで複数のゴルーチンにProducerとConsumerの役割を持たせた場合の並行処理のプログラムについて説明。
ProducerとConsumerという2つの役割を持つゴルーチンを作る。
main関数からProducerのゴルーチンをいくつか並行で走らせ実行し、結果をチャネルに入れる。チャネルからConsumerのゴルーチンに渡してデータの処理を行う。
例えば、いろいろなサーバーからログの解析結果をProducer側で取得し、Consumer側に渡してログの処理や保存するようなアプリケーションなどのイメージです。
-
Producerの処理
- チャネルを通して値をconsumer関数のゴルーチンに送るProducer関数
- 今回は単純に引数の値にiに2を掛けるという処理をしてチャネルに送信する。
-
Consumerの処理
- producer関数のゴルーチンからチャネルをで受け取った値を処理するconsumer関数を作成する。
- チャネルの値をrangeで取り出して処理をする。ここでは、文字列の「process」にiに1000を掛けた値を表示する。
- 処理が終わったら、producer関数から渡された値の処理が終わったということを伝えるために、「wg.Done()」を実行する
-
main関数の処理
- sync.WaitGroupを宣言する。
- チャネルを「ch := make(chan int)」作る。
- producer関数をfor関数をfor文で10回呼び出す
- ループ中、consumerに値を渡すことをすべて渡せた確認するため、wg.Add(1)を書くを書く
- その後、ゴルーチンを呼び出してproducer関数を実行する。
- consumer関数の引数にチャネルとWaitGroupを渡して実行する
- その後、consumer関数がproducer関数から送られてくる値をすべて受け取るまで「wg.Wait()」で待つ。
- 最後に、「close(ch)」でチャネルを閉じる。
func producer(ch chan int, i int) {
ch <- i * 2
}
func consumer(ch chan int, wg *sync.WaitGroup) {
for i := range ch {
fmt.Println("process", i*1000)
wg.Done()
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int)
// Producer
for i := 0; i < 10; i++ {
wg.Add(1)
go producer(ch , i)
}
// Consumer
go consumer(ch, &wg)
wg.Wait()
close(ch)
}
並行処理のプログラムの作りは様々であるが、その中の一つpipleline(パイプライン)というものがある。
Goでは複数のチャネルを用意し、順番に処理していくような並行処理をpipelineという。
- main関数からゴルーチンを1つ立ち上げる
- 最初に立ち上げたゴルーチンが処理をする。
- 処理した結果をチャネルに渡して、次のゴルーチンで処理する。 これを繰り返して、最終結果をmain関数の渡す。
func producer(first chan int) {
defer close(first)
for i := 0; i < 10; i++ {
first <- i
}
}
func multi2(first chan int, second chan int) {
defer close(second)
for i := range first {
second <- i * 2
}
}
func multi4(second chan int, third chan int) {
defer close(third)
for i := range second {
third <- i * 4
}
}
func main() {
first := make(chan int)
second := make(chan int)
third := make(chan int)
go producer(first)
go muluti2(first, second)
go muluti4(second, third)
for result := range third {
fmt.Println(result)
}
}
// 実行結果
// 0
// 8
// 16
// 24
// 32
// 40
// 48
// 56
// 64
// 72
multi2の関数では、firstチャネルから受信して、secondチャネルに送信している、この時の引数を書くさいに、受信するチャネルの左側に、
送信するチャネルの右側に「<-」を書くことでチャネルの受信/送信を明示的に指定することができる。
func multi2(first <- chan int, second chan <- int) {
}
複数のチャネルを使って複数のゴルーチンとやりとりするとき、受信したチャネルによって処理を分岐したい場合がある。
その場合、selectを使うことで、チャネル毎の処理を書くことができる。
複数のゴルーチンがそれそれ別のチャネルでデータを受信する。
例えば、複数のゴルーチンからネットワークのパケットを受信するようなイメージ
このとき、それぞれの処理をブロッキングしないように、selectを使う。
func goroutine1(ch chan string) {
for {
ch <- "package from 1"
time.Sleep(1 * time.Second)
}
}
func goroutine2(ch chan string) {
for {
ch <- "package from 2"
time.Sleep(1 * time.Second)
}
}
func main() {
c1 := make(chan string)
c2 := make(chan string)
go goroutine1(c1)
go goroutine2(c2)
for {
select {
case msg1 := <-c1:
fmt.Println(msg1)
case msg2 := <-c2:
fmt.Println(msg2)
}
}
}
// 実行結果
// packet from 2
// packet from 1
// packet from 1
// packet from 2
// packet from 2
// packet from 1
// packet from 1
// packet from 2
// packet from 2
// packet from 1
// packet from 1
// packet from 2
// packet from 1
// packet from 2
// packet from 2
// packet from 1
selectではチャネルに応じてそれぞれの処理を作りました。
どのチャネルでもないときの処理はdefaultを使って書くことができる。
selectを使ったdefaultの使用例、for文とselect文を使うときにラベルを使って途中で処理を抜ける
方法について記載する。
selectでdefaultを使うと、どのチャネルでもないときに実行したい処理を書ける。
-
time.Tick
- 設定した時間ごとに周期的にチャネルへと値を送信する。
-
time.After
- 設定した時間が経過したタイミングで値を送信する。
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
// 実行結果
// .
// .
// tick.
// .
// .
// tick.
// .
// .
// tick.
// .
// .
// tick.
// .
// .
// tick.
// BOOM!
例えば、for文の外に、「fmt.Println('##########')」と書いて実行しても表示は変わらない。
これは、returnの時点で処理が終了してしまう為である。
-
returnの代わりにbreak文を用いる
- selectの中でreturnの代わりにbreakを書いた場合、for文から抜けださず無限ループになってしますので注意。
- breakは最も内側のfor文、switch文、select文を抜ける
-
Boolean型の変数を判定に使う
- よくないプログラムの例とされるが、Boolean型の変数を使ってfor文を途中で抜けることができる。
-
ラベルを使用してfoorループを抜ける
- Goではラベルを使うことでループを抜けることができる。
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
OuterLoop:
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
break OuterLoop
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
fmt.Println("##########")
}
// 実行結果
// .
// .
// tick
// .
// .
// tick
// .
// .
// tick
// .
// .
// tick
// .
// .
// BOOM!
// ##############
複数のゴルーチン間でのデータのやりとりにはチャネルを使ってきました。
マップなどの値の異なるゴルーチンから読み込んだり書き込んだりする場合、場合によっては同じタイミグで
読み書きされてしまうためエラーが起きる。そのため、同時に読み書きが起こらないようにプログラムを調整する
必要がある。ここではん、sync.Mutexを使って異なるゴルーチンから値を読み書きするための方法についてする。
-
sync.Mutex
- 2つのゴルーチンから、1つのマップを読み込んだり、書き換えたりしようとすると問題が起きるため、sync.Mutexを使う。
- 下記のサンプルの通りに、LockしてからUnlockするまでの間は、
- 1つのゴルーチンが値を書き込んでいるときは、もう一つのゴルーチンは書き込むことができない
- 値を読み込むときも同様にLockとUnlockをする。
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
v map[string]int
mux sync.Mutex
}
func (c *Counter) Inc(key string) {
c.mux.Lock()
defer c.mux.Unlock()
c.v[key]++
}
func (c *Counter) Value(key string) int {
c.mux.Lock()
defer c.mux.Unlock()
return c.m[key]
}
func main() {
c := Counter{v: make(map[string]int)}
go func() {
for i := 0; i < 10; i++ {
c.Inc("Key")
}
}()
go func() {
for i := 0; i < 10; i++ {
c.Inc("Key")
}
}()
time.Sleep(1 * time.Second)
fmt.Println(c.v, c.Value("Key"))
}
// 実行結果
// map[Key:10 key:10] 10