本文译自Rob PikeGo语言PPT教程 – "The Go Programming Language Part3(updated June 2011)"。由于该教程的最新更新时间早于Go 1版本发布,因此该PPT中的一些内容与Go 1语言规范略有差异,到时我会在相应的地方做上注解。

第三部分大纲

  • 并发与通信
    • Goroutines
    • 通道(Channel)
    • 并发相关话题

并发与通信:Goroutines

Goroutines

术语:

对于"并发运行的事物"已经有了好多术语 – 进程、线程、协程(coroutine)、POSIX线程、NPTL线程、轻量级进程…,但这些事物都或多或少有不同。并且Go中的并发与哪种都不甚相同。

因此我们介绍一个新术语:goroutine。

定义

一个Goroutine是一个与其他goroutines运行在同一地址空间的Go函数或方法。一个运行的程序由一个或更多个goroutine组成。

它与线程、协程、进程等不同。它是一个goroutine。

注意:Concurrency与Parallelism是不同的概念。如果你不了解它们的不同,查查相关资料吧。

关于并发的问题有许多。我们后续会提及。现在就假设它能按其对外所宣称的那样正常工作吧。

启动一个Goroutine

调用一个函数或方法,然后说go:

func IsReady(what string, minutes int64) {
    time.Sleep(minutes * 60*1e9) // Unit is nanosecs.
    fmt.Println(what, "is ready")
}
go IsReady("tea", 6)
go IsReady("coffee", 2)
fmt.Println("I'm waiting…")

打印:

I'm waiting… (立即)
coffee is ready (2分钟后)
tea is ready (6分钟后)

一些简单的事实

goroutine的使用代价很低。

当从最外层函数返回,或执行到结尾处时,goroutine退出。

goroutines可以并行地在不同CPU上执行,共享内存。

你无需担心栈大小。

在gccgo中,至少目前goroutines就是pthreads。在6g中,goroutines采用基于线程的多路复用技术,因此它们的代价更低廉。

无论是上面哪个实现,栈都很小(几KB),可以根据需要增长。因此goroutines使用很少的内存。你可以创建很多goroutines,它们还可以动态拥有很大的栈。

程序员无需考虑栈大小相关话题。在Go中,这种考虑甚至不应该出现。

调度

Goroutine多路复用系统线程。当一个goroutine执行了一个阻塞的系统调用时,其他goroutine不会不阻塞。

计划后续实现CPU绑定的goroutines,不过目前用6g如果你想要用户层级别的并行,你必须设置环境变量GOMAXPROCS或调用runtime.GOMAXPROCS(n)。

GOMAXPROCS告诉运行时调度器有多少个用户空间goroutine即将同时执行,理想情况下在不同的CPU核上。

*gccgo总是为每个goroutine单独分配一个线程执行。

并发与通信:Channels

Go中的Channel

除非两个goroutine可以通信,否则它们无法协作。

Go中有一个名为channel的类型,提供通信和同步能力。

Go中还提供一些特殊的基于channel的控制结构,使得编写并发程序更加容易。

Channel类型

该类型最简单形式:
    chan elementType

通过这个类型的值,你可以发送和接收elementType类型的元素。

Channel是引用类型,这意味着如果你将一个chan变量赋值给另外一个,则这两个变量访问的是相同的channel。同样,这也意味着可以用make分配一个channel:

    var c = make(chan int)

通信操作符:<-

箭头指示数据流向。

作为一个二元操作符,<-将值从右侧发送到左侧的channel中:

c := make(chan int)
c <- 1 // 向c发送1

作为前缀一元操作符,<- 从一个channel中接收数据:

v = <-c // 从c中接收数据,赋值给v
<-c // 接收数据,丢弃
i := <-c // 接收值,用于初始化i

语义

默认情况下,通信是同步的。(我们后续将讨论异步通信)。这意味着:

1) A在一个channel上的发送操作会阻塞,直到该channel上有一个接收者就绪。
2) 在一个channel上到的接收操作会阻塞,直到该channel上有一个发送者就绪。

因此通信是同步的一种形式:两个通过channel交换数据的goroutine在通信的时刻同步。

让我们泵一些数据吧

func pump(ch chan int) {
    for i := 0; ; i++ { ch <- i }
}
ch1 := make(chan int)
go pump(ch1) // pump挂起; 我们运行
fmt.Println(<-ch1) // 打印 0

现在我们启动一个循环接收者:

func suck(ch chan int) {
    for { fmt.Println(<-ch) }
}
go suck(ch1) // 大量数字出现

你仍可以溜进去,抓取一个值:

fmt.Println(<-ch1) // 输出:3141159

返回channel的函数

在前面的例子中,pump像一个生成器,喷涌出值。但在分配channel等方面做了很多工作。让我们将其打包到一个返回channel的函数中:

func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ { ch <- i }
    }()
    return ch
}
stream := pump()
fmt.Println(<-stream)// 打印 0

"返回channel的函数"是Go中的一个重要的惯用法。

到处都是返回channel的函数

我这里不再重复那些你可以从其他地方找到的知名例子。这里有些可以了解一下:

1) prime sieve: 在语言规范以及教程中。

2) Doug McIlroy的Power系列论文:http://plan9.bell-labs.com/who/rsc/thread/squint.pdf

这个程序的一个Go版本在测试套件中:http://golang.org/test/chan/powser1.go

Range和Channel

for循环的range子句接收channel作为一个操作数,在这种情况下,for循环迭代处理从channel接收到的值。我们来重写pump函数;这里是suck的重写,让它也启动一个goroutine:

func suck(ch chan int) {
    go func() {
        for v := range ch { fmt.Println(v) }
    }()
}
suck(pump()) // 现在不再阻塞

关闭一个Channel

range是如何知道何时channel上的数据传输结束了呢?发送者调用一个内置函数close:

    close(ch)

接收者使用"comma ok"测试发送者是否关闭了channel:

    val, ok:= <- ch

当结果为(value, true),说明依然有数据;一旦channel关闭,数据流干,结果将会是(zero, false)。

在一个Channel上使用Range

在一个channel上使用range,诸如:

for value := range <-ch {
    use(value)
}

等价于:

for {
    value, ok := <-ch
    if !ok {
        break
    }
    use(value)
}

Close

关键点:

只有发送者可以调用close。
只有接收者可以询问是否channel被关闭了。
只有在获取值的同时询问(避免竞争)

只有在有必要通知接收者不会再有数据的时候才调用close。

大多数情况下,不需要用close;它与关闭一个文件没有可比性。

不管怎样,channel是可以垃圾回收的。

Channel的方向性

一个channel变量的最简单形式是一个非缓冲(同步的)值,该值可以用于进行发送和接收。

一个channel类型可以被指定为只发或只收:

var recvOnly <-chan int
var sendOnly chan<- int

Channel的方向性(2)

所有Channel创建时都是双向的,但我们可以将它们赋值给带方向性的channel变量。从类型安全性角度考虑,对于函数内的实例非常有用:

func sink(ch <-chan int) {
    for { <-ch }
}
func source(ch chan<- int) {
    for { ch <- 1 }
}
c := make(chan int)//双向的
go source(c)
go sink(c)

同步的Channel

同步的Channel是非缓冲的。发送动作不会完成,直到一个接收者接收这个值。

c := make(chan int)
go func() {
    time.Sleep(60*1e9)
    x := <-c
    fmt.Println("received", x)
}()

fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)

输出:

sending 10 (立即发生)
sent 10 (60秒后,这两行出现)
received 10

异步的Channel

通过告知make缓冲中元素的数量,我们可以创建一个带缓冲的、异步的channel。

c := make(chan int, 50)
go func() {
    time.Sleep(60*1e9)
    x := <-c
    fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)

输出:

sending 10 (立刻发生)
sent 10(现在)
received 10 (60秒后)

缓冲不是类型的一部分

注意缓冲的大小甚至其自身都不是channel类型的一部分,只是值的一部分。因此下面的代码虽危险,但合法:

buf = make(chan int, 1)
unbuf = make(chan int)
buf = unbuf
unbuf = buf

缓冲是一个值的属性,而不是类型的。

Select

select是Go中的一个控制结构,类似于用于通信的switch语句。每个case必须是一个通信操作,要么是send要么是receive。

ci, cs := make(chan int), make(chan string)
select {
    case v := <-ci:
        fmt.Printf("received %d from ci\n", v)
    case v := <-cs:
        fmt.Printf("received %s from cs\n", v)
}

Select随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。

Select语义

快速一览:

- 每个case都必须是一个通信(可能是:=)
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。
- 否则:
    – 如果有default子句,则执行该语句。
    – 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。

随机bit生成器

幼稚但很有说明性的例子:

c := make(chan int)
go func() {
    for {
        fmt.Println(<-c)
    }
}()

for {
    select {
        case c <- 0: //没有语句,没有fallthrough
        case c <- 1:
    }
}

测试可通信性

一个通信是否可以进行,而不阻塞?一个带default字句的select可以告诉我们:

select {
    case v := <-ch:
        fmt.Println("received", v)
    default:
        fmt.Println("ch not ready for receive")
}

如果没有其他case可以运行,那default子句将被执行,因此这对于非阻塞接收是一个惯用法;非阻塞发送显然也可以这么做。

超时

一个通信可以在一个给定的时间内成功完成么?time包包含了after函数:

func After(ns int64) <-chan int64

在指定时间段之后,它向返回的channel中传递一个值(当前时间)。

在select中使用它以实现超时:

select {
case v := <-ch:
    fmt.Println("received", v)
case <-time.After(30*1e9):
    fmt.Println("timed out after 30 seconds")
}

多路复用(multiplexing)

channel是原生值,这意味着他们也能通过channel发送。这个属性使得编写一个服务类多路复用器变得十分容易,因为客户端在提交请求时可一并提供用于回复应答的channel。

chanOfChans := make(chan chan int)

或者更典型的如:

type Reply struct { … }
type Request struct {
    arg1, arg2 someType
    replyc chan *Reply
}

多路复用服务器

type request struct {
    a, b int
    replyc chan int
}

type binOp func(a, b int) int
func run(op binOp, req *request) {
    req.replyc <- op(req.a, req.b)
}

func server(op binOp, service <-chan *request) {
    for {
        req := <-service // 请求到达这里
        go run(op, req) // 不等op
    }
}

启动服务器

使用"返回channel的函数"惯用法来为一个新服务器创建一个channel:

func startServer(op binOp) chan<- *request {
    service := make(chan *request)
    go server(op, req)
    return service
}

adderChan := startServer(
    func(a, b int) int { return a + b }
)

客户端

在教程中有个例子更为详尽,但这里是一个变体:

func (r *request) String() string {
    return fmt.Sprintf("%d+%d=%d",
    r.a, r.b, <-r.replyc)
}
req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}

请求已经就绪,发送它们:

adderChan <- req1
adderChan <- req2

可以以任何顺序获得结果;r.replyc多路分解:

fmt.Println(req2, req1)

停掉

在多路复用的例子中,服务将永远运行下去。要将其干净地停掉,可通过一个channel发送信号。下面这个server具有相同的功能,但多了一个quit channel:

func server(op binOp, service <-chan *request,
            quit <-chan bool) {
    for {
        select {
            case req := <-service:
                go run(op, req) // don't wait for it
            case <-quit:
                return
        }
    }
}

启动服务器

其余代码都相似,只是多了个channel:

func startServer(op binOp) (service chan<- *request,
        quit chan<- bool) {
    service = make(chan *request)
    quit = make(chan bool)
    go server(op, service, quit)
    return service, quit
}

adderChan, quitChan := startServer(
    func(a, b int) int { return a + b }
)

停掉:客户端

只有当准备停掉服务端的时候,客户端才会受到影响:

req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}
adderChan <- req1
adderChan <- req2
fmt.Println(req2, req1)

所有都完成后,向服务器发送信号,让其退出:

quitChan <- true

package main
import ("flag"; "fmt")
var nGoroutine = flag.Int("n", 100000, "how many")
func f(left, right chan int) { left <- 1 + <-right }
func main() {
    flag.Parse()
    leftmost := make(chan int)
    var left, right chan int = nil, leftmost

    for i := 0; i < *nGoroutine; i++ {
        left, right = right, make(chan int)
        go f(left, right)
    }

    right <- 0 // bang!

    x := <-leftmost // 等待完成
    fmt.Println(x)    // 100000
}

例子:Channel作为缓存

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func server() {
    for {
        b := <-serverChan // 等待做work
        process(b) // 在缓存中处理请求
        select {
            case freeList <- b: // 如果有空间,重用缓存
            default:             // 否则,丢弃它
        }
    }
}

func client() {
    for {
        var b *Buffer
        select {
            case b = <-freeList:            // 如果就绪,抓取一个
            default: b = new(Buffer) // 否则,分配一个
        }
        load(b)// 读取下一个请求放入b中
        serverChan <- b // 将请求发给server.
    }
}

并发

并发相关话题

许多并发方面,当然,Go一直在尽力做好它们。诸如Channel发送和接收是原子的。select语句也是缜密定义和实现的等。

但goroutine在共享内存中运行,通信网络可能死锁,多线程调试器糟糕透顶等等。

接下来做什么?

Go给予你原生的

不要用你在使用C或C++或甚至是Java时的方式去编程。

channel给予你同步和通信的能力,并且使得它们很强大,但也可以很容易知道你是否可以很好的使用它们。

规则是:

    不要通过共享内存通信,相反,通过通信共享内存。

特有的通信行为保证了同步!

模型

例如,使用一个channel发送数据到一个专职服务goroutine。如果同一时刻只有一个goroutine拥有指向数据的指针,那谈不上什么并发。

这是我们极力推荐的服务端编程模型,至少是对旧的"每个客户端一个线程"的泛化。它自从20世纪80年代就开始使用了,它工作的很好。

 

内存模型

那关于同步和共享内存的令人生厌的细节在:

http://golang.org/doc/go_mem.html

但如果你遵循我们的方法,你很少需要理解那些内容。

© 2012, bigwhite. 版权所有.

Related posts:

  1. Go程序设计语言(二)
  2. Go程序设计语言(一)
  3. 也谈Go语言编程 – Hello,Go!
  4. 开始学Go
  5. C单元测试之使用cmockery