再谈Go的常用并发模型

脱离业务领域谈设计模型都是扯淡,本文旨在抽象几种常用模型,希望在设计业务模型时提供参考

前言

转眼已经快写了三年的Go了,因为Go的本身设计的简洁,在平常基本用不到很多设计模式的思想,更别谈并发模型了,看过很多代码,基本都是channel/condition 乱飞,没有文档或者注释辅助的情况下,review/refator 代码简直是噩梦。 为了不让自己成为自己讨厌的人,我决定自己整理下Go常用的并发模式,也正好听Go Time的播客的时候,听JBD提到2016 GopherCon上的「Visualize Go Routine」的topic,又去拜读了相关文章,觉得是时候沉淀下这块知识了。

常用并发模型

引子: Timer

package main

import "time"

func timer(d time.Duration) <-chan int {
    c := make(chan int)
    go func() {
        time.Sleep(d)
        c <- 1
    }()
    return c
}

func main() {
    // 开启N个定时器,但是要注意这N个定时器并不是并发执行的。
    for i := 0; i < N; i++ {
        c := timer(1 * time.Second)
        <-c
    }
}

这是一个很简单的并发模型的引子,目的是构造一个定时器,或者构造一个延时任务

(为什么这边不用timer.Ticker ? 因为作为一个引子,这个模型会引导我们的思想尽量往并发模型上靠拢。)

Ping Pong

让数据在Go Routine间反复横跳 !

package main

import (
	"fmt"
	"time"
)

type Ball struct {
	hits int
}

func main() {

	table := make(chan *Ball)
	go player(1,table)
	go player(2,table)

	// 比赛开始
	table <- &Ball{}
	// 开始一场1s的比赛吧!
	time.Sleep(1 * time.Second)
	// 比赛结束
	fmt.Printf("total hits: %d\n", (<-table).hits)
}

func player(pnumber int,table chan *Ball) {
	for {
        ball := <-table
        fmt.Printf("player %d start to hit the ball",pnumber)
		ball.hits++
		// ping or pong
		time.Sleep(100 * time.Millisecond)
		table <- ball
	}
}

在上面代码实现中,我们假设有两个玩家,每场比赛1s,玩家击球时间100ms。

这个是有名的乒乓模型。在这个模型中,球(数据)会在N个玩家(Go Routine所持有的channel)中传递,直到到达指定时间,球(数据)被回收。

这个模式还有一个tips可以用来扩展:因为Go Runtime维护了一个channel的Receiver Queue(队列,具有FIFO的特性),所以导致总是最后一个Go Routine所在的channel可以先得到数据,并且确保了每次都可以固定顺序传递,所以不会导致同一个channel执行多次的情况。也即:按照代码的定义顺序,我们有chan recv1,chan recv2,chan recv3,那么receiver拿到数据的顺序一定是3->2->1->3.

Fan-In

从多个数据源收集数据并把这些数据整合到同一个channel,也就是All for one 模式 (果然这么叫好奇怪

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration) {
    var i int
    for {
        ch <- i
        i++
        time.Sleep(d)
    }
}

func reader(out chan int) {
    for x := range out {
        fmt.Println(x)
    }
}

func main() {
    // producer的集合channel
    ch := make(chan int)
    // reader channel
    out := make(chan int)
    go producer(ch, 100*time.Millisecond)
    go producer(ch, 250*time.Millisecond)
    go reader(out)
    for i := range ch {
        out <- i
    }
}

Fan-Out

从一个channel中读取数据,分配到多个channel里面进行处理,也就是one for all 模式。

也可以说是 workerpool 模式。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(tasksCh <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        task, ok := <-tasksCh
        if !ok {
            return
        }
        d := time.Duration(task) * time.Millisecond
        time.Sleep(d)
        fmt.Println("processing task", task)
    }
}

func pool(wg *sync.WaitGroup, workers, tasks int) {
    tasksCh := make(chan int)

    for i := 0; i < workers; i++ {
        // N个worker
        go worker(tasksCh, wg)
    }

    for i := 0; i < tasks; i++ {
        // 把task放入task chan
        tasksCh <- i
    }

    close(tasksCh)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(36)
    go pool(&wg, 36, 50)
    wg.Wait()
}

当然,这种方式还可以扩展成为 subworker 模式,由worker的channel再把task 分发出去。(当然还能是sub subworker 啥的

Daisy Chain

refer: https://talks.golang.org/2012/concurrency.slide#39

//  Go Routine 一号
func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 10000
    leftmost := make(chan int)
    // right 是`leftmost`的一个引用,他们指向同一个内存地址
    right := leftmost
    // left 也是`leftmost`的一个引用
    left := leftmost
    for i := 0; i < n; i++ {
        // 为什么在这里要重新定义`right` ?
        // 如果这里不重新定义`right`(把`right`指向新的地址),在「Go Routine 二号」这里就会直接往这个channel塞1了
        // 然后因为right和leftmost都指向同一个地址,最后就直接输出1退出了,甚至都没「Go Routine 一号」什么事了
        // 这里还需要注意,Go Routine的调用栈是FILO(先入后出)的
        right = make(chan int)
        // 调用「Go Routine 一号」,目的是使`<-left`增1
        // 这里特别需要注意index == 0 的情况 ,此时,`left`是指向`leftmost`,也只有此时,`left`是指向`leftmost`的,
        // 这也保证了,「Go Routine 一号」最后一次的出栈可以让值正确地赋到`leftmost`上.
        go f(left, right)
        // 把`right`赋值给`left`,这里非常关键,这里除了代码表面上的保留`right`之前的状态,还覆盖了`left`的地址,
        // 让其之后不再指向`leftmost`的地址,而是指向`right`的地址,以形成关键的`daisy chain`,
        // daisy chain(展开): left(应该是leftmost) <-1 + ... + 1 + 1 + (1 + <- right .)
        left = right
    }

    //  Go Routine 二号
    // 这个 匿名Go Routine 必须放在for循环之后,确保传入这个go routine的是最后一个`right`, 原因可以结合上面的展开daisy chain 看,那么如果把这个匿名 go routine放在循环之前会发生什么呢?
    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

这段代码很有意思,也非常巧妙,在SOF上也有相关讨论, 任何一个地方动一下,可能最后的结果都可能不一样。

使用场景: Daisy Chain的模式,在编写链式workerflow时显得非常高效,假设一个job需要被多个handler顺序处理,那么这种模式在有非常多(成千上万)个handler时,会有奇效。

这里有人也用Daisy Chain实现了一个类似workpool的东西,对这种模式实际应用场景感兴趣的同学也可以参考下。

总结

并发模式不是银弹,如果串行的模式能花更少的代价造出符合当前场景的模型,那么, 何尝又不是不可呢?

先就酱,这又是一个长期坑, To Be Continued…