标签 Channel 下的文章

Golang Channel用法简编

在进入正式内容前,我这里先顺便转发一则消息,那就是Golang 1.3.2已经正式发布了。国内的golangtc已经镜像了golang.org的安装包下载页面,国内go程序员与爱好者们可以到"Golang中 国",即golangtc.com去下载go 1.3.2版本。

Go这门语言也许你还不甚了解,甚至是完全不知道,这也有情可原,毕竟Go在TIOBE编程语言排行榜上位列30开外。但近期使用Golang 实现的一杀手级应用 Docker你却不该不知道。docker目前火得是一塌糊涂啊。你去国内外各大技术站点用眼轻瞥一下,如 果没有涉及到“docker”字样新闻的站点建 议你以后就不要再去访问了^_^。Docker是啥、怎么用以及基础实践可以参加国内一位仁兄的经验之作:《 Docker – 从入门到实践》。

据我了解,目前国内试水Go语言开发后台系统的大公司与初创公司日益增多,比如七牛、京东、小米,盛大,金山,东软,搜狗等,在这里我们可以看到一些公司的Go语言应用列表,并且目前这个列表似乎依旧在丰富中。国内Go语言的推广与布道也再稳步推进中,不过目前来看多以Go入 门与基础为主题,Go idioms、tips或Best Practice的Share并不多见,想必国内的先行者、布道师们还在韬光养晦,积攒经验,等到时机来临再厚积薄发。另外国内似乎还没有一个针对Go的 布道平台,比如Golang技术大会之类的的平台。

在国外,虽然Go也刚刚起步,但在Golang share的广度和深度方面显然更进一步。Go的国际会议目前还不多,除了Golang老东家Google在自己的各种大会上留给Golang展示自己的 机会外,由 Gopher Academy 发起的GopherCon 会议也于今年第一次举行,并放出诸多高质量资料,在这里可以下载。欧洲的Go语言大会.dotgo也即将开幕,估计后续这两个大会将撑起Golang技术分享 的旗帜。

言归正传,这里要写的东西并非原创,自己的Go仅仅算是入门级别,工程经验、Best Practice等还谈不上有多少,因此这里主要是针对GopherCon2014上的“舶来品”的学习心得。来自CloudFlare的工程师John Graham-Cumming谈了关于 Channel的实践经验,这里针对其分享的内容,记录一些学习体会和理解,并结合一些外延知识,也可以算是一种学习笔记吧,仅供参考。

一、Golang并发基础理论

Golang在并发设计方面参考了C.A.R Hoare的CSP,即Communicating Sequential Processes并发模型理论。但就像John Graham-Cumming所说的那样,多数Golang程序员或爱好者仅仅停留在“知道”这一层次,理解CSP理论的并不多,毕竟多数程序员是搞工程 的。不过要想系统学习CSP的人可以从这里下载到CSP论文的最新版本。

维基百科中概要罗列了CSP模型与另外一种并发模型Actor模型的区别:

Actor模型广义上讲与CSP模型很相似。但两种模型就提供的原语而言,又有一些根本上的不同之处:
    – CSP模型处理过程是匿名的,而Actor模型中的Actor则具有身份标识。
    – CSP模型的消息传递在收发消息进程间包含了一个交会点,即发送方只能在接收方准备好接收消息时才能发送消息。相反,actor模型中的消息传递是异步 的,即消息的发送和接收无需在同一时间进行,发送方可以在接收方准备好接收消息前将消息发送出去。这两种方案可以认为是彼此对偶的。在某种意义下,基于交 会点的系统可以通过构造带缓冲的通信的方式来模拟异步消息系统。而异步系统可以通过构造带消息/应答协议的方式来同步发送方和接收方来模拟交会点似的通信 方式。
    – CSP使用显式的Channel用于消息传递,而Actor模型则将消息发送给命名的目的Actor。这两种方法可以被认为是对偶的。某种意义下,进程可 以从一个实际上拥有身份标识的channel接收消息,而通过将actors构造成类Channel的行为模式也可以打破actors之间的名字耦合。

二、Go Channel基本操作语法

Go Channel的基本操作语法如下:

c := make(chan bool) //创建一个无缓冲的bool型Channel

c <- x        //向一个Channel发送一个值
<- c          //从一个Channel中接收一个值
x = <- c      //从Channel c接收一个值并将其存储到x中
x, ok = <- c  //从Channel接收一个值,如果channel关闭了或没有数据,那么ok将被置为false

不带缓冲的Channel兼具通信和同步两种特性,颇受青睐。

三、Channel用作信号(Signal)的场景

1、等待一个事件(Event)

等待一个事件,有时候通过close一个Channel就足够了。例如:

//testwaitevent1.go
package main

import "fmt"

func main() {
        fmt.Println("Begin doing something!")
        c := make(chan bool)
        go func() {
                fmt.Println("Doing something…")
                close(c)
        }()
        <-c
        fmt.Println("Done!")
}

这里main goroutine通过"<-c"来等待sub goroutine中的“完成事件”,sub goroutine通过close channel促发这一事件。当然也可以通过向Channel写入一个bool值的方式来作为事件通知。main goroutine在channel c上没有任何数据可读的情况下会阻塞等待。

关于输出结果:

根据《Go memory model》中关于close channel与recv from channel的order的定义:The closing of a channel happens before a receive that returns a zero value because the channel is closed.

我们可以很容易判断出上面程序的输出结果:

Begin doing something!
Doing something…
Done!

如果将close(c)换成c<-true,则根据《Go memory model》中的定义:A receive from an unbuffered channel happens before the send on that channel completes.
"<-c"要先于"c<-true"完成,但也不影响日志的输出顺序,输出结果仍为上面三行。

2、协同多个Goroutines

同上,close channel还可以用于协同多个Goroutines,比如下面这个例子,我们创建了100个Worker Goroutine,这些Goroutine在被创建出来后都阻塞在"<-start"上,直到我们在main goroutine中给出开工的信号:"close(start)",这些goroutines才开始真正的并发运行起来。

//testwaitevent2.go
package main

import "fmt"

func worker(start chan bool, index int) {
        <-start
        fmt.Println("This is Worker:", index)
}

func main() {
        start := make(chan bool)
        for i := 1; i <= 100; i++ {
                go worker(start, i)
        }
        close(start)
        select {} //deadlock we expected
}

3、Select

【select的基本操作】
select是Go语言特有的操作,使用select我们可以同时在多个channel上进行发送/接收操作。下面是select的基本操作。

select {
case x := <- somechan:
    // … 使用x进行一些操作

case y, ok := <- someOtherchan:
    // … 使用y进行一些操作,
    //
检查ok值判断someOtherchan是否已经关闭

case outputChan <- z:
    // … z值被成功发送到Channel上时

default:
    // … 上面case均无法通信时,执行此分支
}

【惯用法:for/select】

我们在使用select时很少只是对其进行一次evaluation,我们常常将其与for {}结合在一起使用,并选择适当时机从for{}中退出。

for {
        select {
        case x := <- somechan:
            // … 使用x进行一些操作

        case y, ok := <- someOtherchan:
            // … 使用y进行一些操作,
            // 检查ok值判断someOtherchan是否已经关闭

        case outputChan <- z:
            // … z值被成功发送到Channel上时

        default:
            // … 上面case均无法通信时,执行此分支
        }
}

【终结workers】

下面是一个常见的终结sub worker goroutines的方法,每个worker goroutine通过select监视一个die channel来及时获取main goroutine的退出通知。

//testterminateworker1.go
package main

import (
    "fmt"
    "time"
)

func worker(die chan bool, index int) {
    fmt.Println("Begin: This is Worker:", index)
    for {
        select {
        //case xx:
            //做事的分支
        case <-die:
            fmt.Println("Done: This is Worker:", index)
            return
        }
    }
}

func main() {
    die := make(chan bool)

    for i := 1; i <= 100; i++ {
        go worker(die, i)
    }

    time.Sleep(time.Second * 5)
    close(die)
    select {}
//deadlock we expected
}

【终结验证】

有时候终结一个worker后,main goroutine想确认worker routine是否真正退出了,可采用下面这种方法:

//testterminateworker2.go
package main

import (
    "fmt"
    //"time"
)

func worker(die chan bool) {
    fmt.Println("Begin: This is Worker")
    for {
        select {
        //case xx:
        //做事的分支
        case <-die:
            fmt.Println("Done: This is Worker")
            die <- true
            return
        }
    }
}

func main() {
    die := make(chan bool)

    go worker(die)

    die <- true
    <-die
    fmt.Println("Worker goroutine has been terminated")
}

【关闭的Channel永远不会阻塞】

下面演示在一个已经关闭了的channel上读写的结果:

//testoperateonclosedchannel.go
package main

import "fmt"

func main() {
        cb := make(chan bool)
        close(cb)
        x := <-cb
        fmt.Printf("%#v\n", x)

        x, ok := <-cb
        fmt.Printf("%#v %#v\n", x, ok)

        ci := make(chan int)
        close(ci)
        y := <-ci
        fmt.Printf("%#v\n", y)

        cb <- true
}

$go run testoperateonclosedchannel.go
false
false false
0
panic: runtime error: send on closed channel

可以看到在一个已经close的unbuffered channel上执行读操作,回返回channel对应类型的零值,比如bool型channel返回false,int型channel返回0。但向close的channel写则会触发panic。不过无论读写都不会导致阻塞。

【关闭带缓存的channel】

将unbuffered channel换成buffered channel会怎样?我们看下面例子:

//testclosedbufferedchannel.go
package main

import "fmt"

func main() {
        c := make(chan int, 3)
        c <- 15
        c <- 34
        c <- 65
        close(c)
        fmt.Printf("%d\n", <-c)
        fmt.Printf("%d\n", <-c)
        fmt.Printf("%d\n", <-c)
        fmt.Printf("%d\n", <-c)

        c <- 1
}

$go run testclosedbufferedchannel.go
15
34
65
0
panic: runtime error: send on closed channel

可以看出带缓冲的channel略有不同。尽管已经close了,但我们依旧可以从中读出关闭前写入的3个值。第四次读取时,则会返回该channel类型的零值。向这类channel写入操作也会触发panic。

【range】

Golang中的range常常和channel并肩作战,它被用来从channel中读取所有值。下面是一个简单的实例:

//testrange.go
package main

import "fmt"

func generator(strings chan string) {
        strings <- "Five hour's New York jet lag"
        strings <- "and Cayce Pollard wakes in Camden Town"
        strings <- "to the dire and ever-decreasing circles"
        strings <- "of disrupted circadian rhythm."
        close(strings)
}

func main() {
        strings := make(chan string)
        go generator(strings)
        for s := range strings {
                fmt.Printf("%s\n", s)
        }
        fmt.Printf("\n")
}

四、隐藏状态

下面通过一个例子来演示一下channel如何用来隐藏状态:

1、例子:唯一的ID服务

//testuniqueid.go
package main

import "fmt"

func newUniqueIDService() <-chan string {
        id := make(chan string)
        go func() {
                var counter int64 = 0
                for {
                        id <- fmt.Sprintf("%x", counter)
                        counter += 1
                }
        }()
        return id
}
func main() {
        id := newUniqueIDService()
        for i := 0; i < 10; i++ {
                fmt.Println(<-id)
        }
}

$ go run testuniqueid.go
0
1
2
3
4
5
6
7
8
9

newUniqueIDService通过一个channel与main goroutine关联,main goroutine无需知道uniqueid实现的细节以及当前状态,只需通过channel获得最新id即可。

五、默认情况

我想这里John Graham-Cumming主要是想告诉我们select的default分支的实践用法。

1、select  for non-blocking receive

idle:= make(chan []byte, 5) //用一个带缓冲的channel构造一个简单的队列

select {
case b = <-idle:
 //尝试从idle队列中读取
    …
default:  //队列空,分配一个新的buffer
        makes += 1
        b = make([]byte, size)
}

2、select for non-blocking send

idle:= make(chan []byte, 5) //用一个带缓冲的channel构造一个简单的队列

select {
case idle <- b: //尝试向队列中插入一个buffer
        //…
default: //队列满?

}

六、Nil Channels

1、nil channels阻塞

对一个没有初始化的channel进行读写操作都将发生阻塞,例子如下:

package main

func main() {
        var c chan int
        <-c
}

$go run testnilchannel.go
fatal error: all goroutines are asleep – deadlock!

package main

func main() {
        var c chan int
        c <- 1
}

$go run testnilchannel.go
fatal error: all goroutines are asleep – deadlock!

2、nil channel在select中很有用

看下面这个例子:

//testnilchannel_bad.go
package main

import "fmt"
import "time"

func main() {
        var c1, c2 chan int = make(chan int), make(chan int)
        go func() {
                time.Sleep(time.Second * 5)
                c1 <- 5
                close(c1)
        }()

        go func() {
                time.Sleep(time.Second * 7)
                c2 <- 7
                close(c2)
        }()

        for {
                select {
                case x := <-c1:
                        fmt.Println(x)
                case x := <-c2:
                        fmt.Println(x)
                }
        }
        fmt.Println("over")
}

我们原本期望程序交替输出5和7两个数字,但实际的输出结果却是:

5
0
0
0
… … 0死循环

再仔细分析代码,原来select每次按case顺序evaluate:
    – 前5s,select一直阻塞;
    – 第5s,c1返回一个5后被close了,“case x := <-c1”这个分支返回,select输出5,并重新select
    – 下一轮select又从“case x := <-c1”这个分支开始evaluate,由于c1被close,按照前面的知识,close的channel不会阻塞,我们会读出这个 channel对应类型的零值,这里就是0;select再次输出0;这时即便c2有值返回,程序也不会走到c2这个分支
    – 依次类推,程序无限循环的输出0

我们利用nil channel来改进这个程序,以实现我们的意图,代码如下:

//testnilchannel.go
package main

import "fmt"
import "time"

func main() {
        var c1, c2 chan int = make(chan int), make(chan int)
        go func() {
                time.Sleep(time.Second * 5)
                c1 <- 5
                close(c1)
        }()

        go func() {
                time.Sleep(time.Second * 7)
                c2 <- 7
                close(c2)
        }()

        for {
                select {
                case x, ok := <-c1:
                        if !ok {
                                c1 = nil
                        } else {
                                fmt.Println(x)
                        }
                case x, ok := <-c2:
                        if !ok {
                                c2 = nil
                        } else {
                                fmt.Println(x)
                        }
                }
                if c1 == nil && c2 == nil {
                        break
                }
        }
        fmt.Println("over")
}

$go run testnilchannel.go
5
7
over

可以看出:通过将已经关闭的channel置为nil,下次select将会阻塞在该channel上,使得select继续下面的分支evaluation。

七、Timers

1、超时机制Timeout

带超时机制的select是常规的tip,下面是示例代码,实现30s的超时select:

func worker(start chan bool) {
        timeout := time.After(30 * time.Second)
        for {
                select {
                     // … do some stuff
                case <- timeout:
                    return
                }
        }
}

2、心跳HeartBeart

与timeout实现类似,下面是一个简单的心跳select实现:

func worker(start chan bool) {
        heartbeat := time.Tick(30 * time.Second)
        for {
                select {
                     // … do some stuff
                case <- heartbeat:
                    //… do heartbeat stuff
                }
        }
}

Go程序设计语言(三)

本文译自Rob PikeGo语言PPT教程 – "The Go Programming Language Part3(updated June 2011)"。由于该教程的最新更新时间早于Go 1版本发布,因此该PPT中的一些内容与Go 1语言规范略有差异,到时我会在相应的地方做上注解。

第三部分大纲

  • 并发与通信
    • Goroutines
    • 通道(Channel)
    • 并发相关话题

并发与通信:Goroutines

Goroutines

术语:

对于"并发运行的事物"已经有了好多术语 – 进程、线程、协程(coroutine)、POSIX线程、NPTL线程、轻量级进程…,但这些事物都或多或少有不同。并且Go中的并发与哪种都不甚相同。

因此我们介绍一个新术语:goroutine。

定义

一个Goroutine是一个与其他goroutines运行在同一地址空间的Go函数或方法。一个运行的程序由一个或更多个goroutine组成。

它与线程、协程、进程等不同。它是一个goroutine。

注意:Concurrency与Parallelism是不同的概念。如果你不了解它们的不同,查查相关资料吧。

关于并发的问题有许多。我们后续会提及。现在就假设它能按其对外所宣称的那样正常工作吧。

启动一个Goroutine

调用一个函数或方法,然后说go:

func IsReady(what string, minutes int64) {
    time.Sleep(minutes * 60*1e9) // Unit is nanosecs.
    fmt.Println(what, "is ready")
}
go IsReady("tea", 6)
go IsReady("coffee", 2)
fmt.Println("I'm waiting…")

打印:

I'm waiting… (立即)
coffee is ready (2分钟后)
tea is ready (6分钟后)

一些简单的事实

goroutine的使用代价很低。

当从最外层函数返回,或执行到结尾处时,goroutine退出。

goroutines可以并行地在不同CPU上执行,共享内存。

你无需担心栈大小。

在gccgo中,至少目前goroutines就是pthreads。在6g中,goroutines采用基于线程的多路复用技术,因此它们的代价更低廉。

无论是上面哪个实现,栈都很小(几KB),可以根据需要增长。因此goroutines使用很少的内存。你可以创建很多goroutines,它们还可以动态拥有很大的栈。

程序员无需考虑栈大小相关话题。在Go中,这种考虑甚至不应该出现。

调度

Goroutine多路复用系统线程。当一个goroutine执行了一个阻塞的系统调用时,其他goroutine不会不阻塞。

计划后续实现CPU绑定的goroutines,不过目前用6g如果你想要用户层级别的并行,你必须设置环境变量GOMAXPROCS或调用runtime.GOMAXPROCS(n)。

GOMAXPROCS告诉运行时调度器有多少个用户空间goroutine即将同时执行,理想情况下在不同的CPU核上。

*gccgo总是为每个goroutine单独分配一个线程执行。

并发与通信:Channels

Go中的Channel

除非两个goroutine可以通信,否则它们无法协作。

Go中有一个名为channel的类型,提供通信和同步能力。

Go中还提供一些特殊的基于channel的控制结构,使得编写并发程序更加容易。

Channel类型

该类型最简单形式:
    chan elementType

通过这个类型的值,你可以发送和接收elementType类型的元素。

Channel是引用类型,这意味着如果你将一个chan变量赋值给另外一个,则这两个变量访问的是相同的channel。同样,这也意味着可以用make分配一个channel:

    var c = make(chan int)

通信操作符:<-

箭头指示数据流向。

作为一个二元操作符,<-将值从右侧发送到左侧的channel中:

c := make(chan int)
c <- 1 // 向c发送1

作为前缀一元操作符,<- 从一个channel中接收数据:

v = <-c // 从c中接收数据,赋值给v
<-c // 接收数据,丢弃
i := <-c // 接收值,用于初始化i

语义

默认情况下,通信是同步的。(我们后续将讨论异步通信)。这意味着:

1) A在一个channel上的发送操作会阻塞,直到该channel上有一个接收者就绪。
2) 在一个channel上到的接收操作会阻塞,直到该channel上有一个发送者就绪。

因此通信是同步的一种形式:两个通过channel交换数据的goroutine在通信的时刻同步。

让我们泵一些数据吧

func pump(ch chan int) {
    for i := 0; ; i++ { ch <- i }
}
ch1 := make(chan int)
go pump(ch1) // pump挂起; 我们运行
fmt.Println(<-ch1) // 打印 0

现在我们启动一个循环接收者:

func suck(ch chan int) {
    for { fmt.Println(<-ch) }
}
go suck(ch1) // 大量数字出现

你仍可以溜进去,抓取一个值:

fmt.Println(<-ch1) // 输出:3141159

返回channel的函数

在前面的例子中,pump像一个生成器,喷涌出值。但在分配channel等方面做了很多工作。让我们将其打包到一个返回channel的函数中:

func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ { ch <- i }
    }()
    return ch
}
stream := pump()
fmt.Println(<-stream)// 打印 0

"返回channel的函数"是Go中的一个重要的惯用法。

到处都是返回channel的函数

我这里不再重复那些你可以从其他地方找到的知名例子。这里有些可以了解一下:

1) prime sieve: 在语言规范以及教程中。

2) Doug McIlroy的Power系列论文:http://plan9.bell-labs.com/who/rsc/thread/squint.pdf

这个程序的一个Go版本在测试套件中:http://golang.org/test/chan/powser1.go

Range和Channel

for循环的range子句接收channel作为一个操作数,在这种情况下,for循环迭代处理从channel接收到的值。我们来重写pump函数;这里是suck的重写,让它也启动一个goroutine:

func suck(ch chan int) {
    go func() {
        for v := range ch { fmt.Println(v) }
    }()
}
suck(pump()) // 现在不再阻塞

关闭一个Channel

range是如何知道何时channel上的数据传输结束了呢?发送者调用一个内置函数close:

    close(ch)

接收者使用"comma ok"测试发送者是否关闭了channel:

    val, ok:= <- ch

当结果为(value, true),说明依然有数据;一旦channel关闭,数据流干,结果将会是(zero, false)。

在一个Channel上使用Range

在一个channel上使用range,诸如:

for value := range <-ch {
    use(value)
}

等价于:

for {
    value, ok := <-ch
    if !ok {
        break
    }
    use(value)
}

Close

关键点:

只有发送者可以调用close。
只有接收者可以询问是否channel被关闭了。
只有在获取值的同时询问(避免竞争)

只有在有必要通知接收者不会再有数据的时候才调用close。

大多数情况下,不需要用close;它与关闭一个文件没有可比性。

不管怎样,channel是可以垃圾回收的。

Channel的方向性

一个channel变量的最简单形式是一个非缓冲(同步的)值,该值可以用于进行发送和接收。

一个channel类型可以被指定为只发或只收:

var recvOnly <-chan int
var sendOnly chan<- int

Channel的方向性(2)

所有Channel创建时都是双向的,但我们可以将它们赋值给带方向性的channel变量。从类型安全性角度考虑,对于函数内的实例非常有用:

func sink(ch <-chan int) {
    for { <-ch }
}
func source(ch chan<- int) {
    for { ch <- 1 }
}
c := make(chan int)//双向的
go source(c)
go sink(c)

同步的Channel

同步的Channel是非缓冲的。发送动作不会完成,直到一个接收者接收这个值。

c := make(chan int)
go func() {
    time.Sleep(60*1e9)
    x := <-c
    fmt.Println("received", x)
}()

fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)

输出:

sending 10 (立即发生)
sent 10 (60秒后,这两行出现)
received 10

异步的Channel

通过告知make缓冲中元素的数量,我们可以创建一个带缓冲的、异步的channel。

c := make(chan int, 50)
go func() {
    time.Sleep(60*1e9)
    x := <-c
    fmt.Println("received", x)
}()
fmt.Println("sending", 10)
c <- 10
fmt.Println("sent", 10)

输出:

sending 10 (立刻发生)
sent 10(现在)
received 10 (60秒后)

缓冲不是类型的一部分

注意缓冲的大小甚至其自身都不是channel类型的一部分,只是值的一部分。因此下面的代码虽危险,但合法:

buf = make(chan int, 1)
unbuf = make(chan int)
buf = unbuf
unbuf = buf

缓冲是一个值的属性,而不是类型的。

Select

select是Go中的一个控制结构,类似于用于通信的switch语句。每个case必须是一个通信操作,要么是send要么是receive。

ci, cs := make(chan int), make(chan string)
select {
    case v := <-ci:
        fmt.Printf("received %d from ci\n", v)
    case v := <-cs:
        fmt.Printf("received %s from cs\n", v)
}

Select随机执行一个可运行的case。如果没有case可运行,它将阻塞,直到有case可运行。一个默认的子句应该总是可运行的。

Select语义

快速一览:

- 每个case都必须是一个通信(可能是:=)
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。
- 否则:
    – 如果有default子句,则执行该语句。
    – 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。

随机bit生成器

幼稚但很有说明性的例子:

c := make(chan int)
go func() {
    for {
        fmt.Println(<-c)
    }
}()

for {
    select {
        case c <- 0: //没有语句,没有fallthrough
        case c <- 1:
    }
}

测试可通信性

一个通信是否可以进行,而不阻塞?一个带default字句的select可以告诉我们:

select {
    case v := <-ch:
        fmt.Println("received", v)
    default:
        fmt.Println("ch not ready for receive")
}

如果没有其他case可以运行,那default子句将被执行,因此这对于非阻塞接收是一个惯用法;非阻塞发送显然也可以这么做。

超时

一个通信可以在一个给定的时间内成功完成么?time包包含了after函数:

func After(ns int64) <-chan int64

在指定时间段之后,它向返回的channel中传递一个值(当前时间)。

在select中使用它以实现超时:

select {
case v := <-ch:
    fmt.Println("received", v)
case <-time.After(30*1e9):
    fmt.Println("timed out after 30 seconds")
}

多路复用(multiplexing)

channel是原生值,这意味着他们也能通过channel发送。这个属性使得编写一个服务类多路复用器变得十分容易,因为客户端在提交请求时可一并提供用于回复应答的channel。

chanOfChans := make(chan chan int)

或者更典型的如:

type Reply struct { … }
type Request struct {
    arg1, arg2 someType
    replyc chan *Reply
}

多路复用服务器

type request struct {
    a, b int
    replyc chan int
}

type binOp func(a, b int) int
func run(op binOp, req *request) {
    req.replyc <- op(req.a, req.b)
}

func server(op binOp, service <-chan *request) {
    for {
        req := <-service // 请求到达这里
        go run(op, req) // 不等op
    }
}

启动服务器

使用"返回channel的函数"惯用法来为一个新服务器创建一个channel:

func startServer(op binOp) chan<- *request {
    service := make(chan *request)
    go server(op, req)
    return service
}

adderChan := startServer(
    func(a, b int) int { return a + b }
)

客户端

在教程中有个例子更为详尽,但这里是一个变体:

func (r *request) String() string {
    return fmt.Sprintf("%d+%d=%d",
    r.a, r.b, <-r.replyc)
}
req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}

请求已经就绪,发送它们:

adderChan <- req1
adderChan <- req2

可以以任何顺序获得结果;r.replyc多路分解:

fmt.Println(req2, req1)

停掉

在多路复用的例子中,服务将永远运行下去。要将其干净地停掉,可通过一个channel发送信号。下面这个server具有相同的功能,但多了一个quit channel:

func server(op binOp, service <-chan *request,
            quit <-chan bool) {
    for {
        select {
            case req := <-service:
                go run(op, req) // don't wait for it
            case <-quit:
                return
        }
    }
}

启动服务器

其余代码都相似,只是多了个channel:

func startServer(op binOp) (service chan<- *request,
        quit chan<- bool) {
    service = make(chan *request)
    quit = make(chan bool)
    go server(op, service, quit)
    return service, quit
}

adderChan, quitChan := startServer(
    func(a, b int) int { return a + b }
)

停掉:客户端

只有当准备停掉服务端的时候,客户端才会受到影响:

req1 := &request{7, 8, make(chan int)}
req2 := &request{17, 18, make(chan int)}
adderChan <- req1
adderChan <- req2
fmt.Println(req2, req1)

所有都完成后,向服务器发送信号,让其退出:

quitChan <- true

package main
import ("flag"; "fmt")
var nGoroutine = flag.Int("n", 100000, "how many")
func f(left, right chan int) { left <- 1 + <-right }
func main() {
    flag.Parse()
    leftmost := make(chan int)
    var left, right chan int = nil, leftmost

    for i := 0; i < *nGoroutine; i++ {
        left, right = right, make(chan int)
        go f(left, right)
    }

    right <- 0 // bang!

    x := <-leftmost // 等待完成
    fmt.Println(x)    // 100000
}

例子:Channel作为缓存

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func server() {
    for {
        b := <-serverChan // 等待做work
        process(b) // 在缓存中处理请求
        select {
            case freeList <- b: // 如果有空间,重用缓存
            default:             // 否则,丢弃它
        }
    }
}

func client() {
    for {
        var b *Buffer
        select {
            case b = <-freeList:            // 如果就绪,抓取一个
            default: b = new(Buffer) // 否则,分配一个
        }
        load(b)// 读取下一个请求放入b中
        serverChan <- b // 将请求发给server.
    }
}

并发

并发相关话题

许多并发方面,当然,Go一直在尽力做好它们。诸如Channel发送和接收是原子的。select语句也是缜密定义和实现的等。

但goroutine在共享内存中运行,通信网络可能死锁,多线程调试器糟糕透顶等等。

接下来做什么?

Go给予你原生的

不要用你在使用C或C++或甚至是Java时的方式去编程。

channel给予你同步和通信的能力,并且使得它们很强大,但也可以很容易知道你是否可以很好的使用它们。

规则是:

    不要通过共享内存通信,相反,通过通信共享内存。

特有的通信行为保证了同步!

模型

例如,使用一个channel发送数据到一个专职服务goroutine。如果同一时刻只有一个goroutine拥有指向数据的指针,那谈不上什么并发。

这是我们极力推荐的服务端编程模型,至少是对旧的"每个客户端一个线程"的泛化。它自从20世纪80年代就开始使用了,它工作的很好。

 

内存模型

那关于同步和共享内存的令人生厌的细节在:

http://golang.org/doc/go_mem.html

但如果你遵循我们的方法,你很少需要理解那些内容。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats