好好学习,天天向上

Go, go|小白的go小抄:并发和错误

并发

并发编程是一个很大的主题,这里之提一些Go特有的重点 #### 通过通信共享 共享值通过channel进行传递。在任意给定的时间内,只有一个goroutine可以访问到该值。在设计上,不会出现数据竞争。即

不通过共享内存通信,相反,通过通信来共享内存

还可以再过些。例如,也许最好将一个mutext放在一个整型变量附近来实现引用计数。但是,作为高级方法,使用channel来控制访问,会使得编写清晰正确的程序更容易。

关于这样一个模型,想象一下:有一个在单CPU上运行的典型的单线程程序。它无需同步原语。现在,运行另一个这样的实例,它也无需同步。然后,让这两个实例进行通信。如果这样的通信是同步器,那么仍然无需其他同步。例如,Unix的管道就完美契合这个模型。虽然,Go的并发方法起源于Hoare的CSP模型,但是它也可以被看成Unix管道的一种类型安全的泛化。

goroutine

一个goroutine有一个简单的模型:它是一个与其他goroutine在相同的地址空间内并发地执行的函数。

它轻量,仅仅消耗栈空间的分配。

goroutine多路复用到多个OS线程,因此,如果一个goroutine阻塞了(例如等待I/O),那么,其他goroutine会继续运行。其设计隐藏了许多线程创建和管理的复杂的东西。

在一个方法或者函数调用前面加上前缀go,从而使得该调用在一个新的goroutine内运行。当调用完成的时候,这个goroutine会悄悄地退出。

1
go list.Sort() // 并发运行list.Sort。不等待
在goroutine调用中,一个函数字面量可以很方便:
1
2
3
4
5
func Do(message string) {
go func(){
fmt.Println(message)
}() // 注意这里的括号:必须调用这个函数!!
}
在Go中,函数字面量是闭包:这种实现确保被函数引用的变量只要有用,就是存活着的。

channel

channel是用make来分配的,结果值指向一个底层的数据结构。如果给make提供了第二个参数,那么这个参数是channel的缓存大小。默认值是0,表示一个不缓存/同步channel

1
2
3
cj := make(chan int) // 不缓存的整型channel
cj := make(chan int, 0) // 不缓存的整型channel
cs := make(chan *os.File, 100) // 指向File的缓存channel
不缓存的channel将通信(值交换)与同步结合在一起,保证两个计算(goroutine)处于已知状态。

一个channel能让启动的goroutine等待某些工作的完成。例如:

1
2
3
4
5
6
7
8
c := make(chan int) // 分配一个channel(不缓存)
// 在goroutine中开始排序。当排序完成后,给channel发信号
go func() {
list.Sort()
c <- 1 // 发送一个信号,值无所谓
}
doSomethingForAWhile()
<- c // 等待排序完成。丢弃接收到的值
channel的接收者往往在收到数据之前处于阻塞状态。而对于发送者: 1. 如果channel是不缓存的,那么,发送者会一直阻塞,直到接收者接收到所发送的值。 2. 如果channel具有缓存,那么,发送者会一直阻塞,直到发送值已经拷贝到缓存中了;如果缓存满了,那么意味着,发送者会一直等待,直到某接收者收到了一个值。

一个带缓存的channel可以被当成一个信号量使用,例如用来限制生产量。在下面这个例子中,过来的请求被传递给handlehandle会发送一个值给某个channel,处理请求,然后从这个channel接收一个值,从而为下一个消费者准备好“信号量”。这个channel的缓存容量限制了对process同步调用的次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
sem <- 1 // 等待活跃队列耗尽(这样才能执行下一行代码)
process(r) // 大概也许可能会耗时较长
sem <- // 完成;下一个请求可以开始运行了。
}

// 一旦MaxOutstanding个handle正在执行process,
// 其他handle将会阻塞在往已满的channel缓存中发送数据,
// 直到MaxOutstanding个handle中的一个handle完成,然后从缓存中接收数据。

func Serve(queue chan *Request) {
for {
req := <- queue
go handle(req) // 不等待handle结束
}
}
这里有一个问题:Serve为每个过来的请求都创建一个新的goroutine。结果是,如果请求来得太快,那么程序将会消耗无限的资源。我们可以让Serve为goroutine的创建设置门槛:
1
2
3
4
5
6
7
8
9
10
11
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request){
process(req)
<- sem
}(req)
// 这里,我们不能直接在goroutine中使用req(这样的话,req变量将会在所有goroutine之间共享)
// 而是将其作为一个参数传给goroutine的函数调用,从而确保对于每个goroutine而言,req都是唯一的。
}
}
好了,让我们回到编写server的一般性问题上来。比较好的管理资源的另一个方法是,启动固定数量的handle goroutine,让它们都从请求channel中读取请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func handle(queue chan *Request) {
for r := range queue { // 阻塞在接收请求中。
process(r)
}
}

func Serve(clientRequests chan *Request) {
// 启动处理器
// 这里,goroutine的数量(MaxOutstanding个)限制了对process的并发调用数。
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<- quit // 等待退出
}
#### channel的channel Go中的一个最重要的特性之一是,channel属于第一类值,它可以像其他第一类值一样,被分配和传递。对此特性的常见使用场景是实现安全并行的多路分解。

在前面一小节的例子中,handle是一个请求处理器,但是我们并没有定义它处理的请求类型。如果请求类型包含了一个用以回复的channel,那么,每个客户端就可以为其回复提供自己的路径了。我们可以将请求类型定义如下:

1
2
3
4
5
type Request struct {
args []int
f func([]int) int // f(args)
resultChan chan int // 接收结果
}
这样,我们的客户端就可以这样写了:
1
2
3
4
5
6
7
8
9
10
11
12
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待响应
fmt.Printf("answer: %d\n", <-request.resultChan)
而对应的服务器端实现只需要修改handle函数:
1
2
3
4
5
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
#### 并行 如果某个任务可以分成几个可以独立执行的块,那么这个任务就可以被并行处理,当每个块完成的时候,借助channel来发送信号。

以下面为例:

1
2
3
4
5
6
7
8
9
type Vector []float64

// 将操作应用到v[i], v[i+1], .., v[n-1]
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // 发送信号表明完成
}
我们在循环里独立加载每个部分,一个占一个CPU。它们可以以任意顺序完成,但是,这没关系。我们仅仅是在启动所有goroutine后,通过耗尽channel来计算完成信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// CPU核数
const numCPU = 4
// 这里可以使用runtime.NumCPU()来返回当前机器的硬件CPU核数,从而取得硬编码
// 还可以使用runtime.GOMAXPROCS,它报告(或设置)一个Go程序可以同时运行的用户指定核数。默认为runtime.NumCPU()

func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU)
for i := 0; i < numCPU; i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
// 耗尽channel
for i := 0; i < numCPU; i++ {
<- c // 等待任务完成
}
// 全部完成啦
}
> 注意:Go是一个并发语言,不是一个并行语言。并不是所有的并行问题都适合Go的模型的。

错误

Go的多值返回使得返回一个详细的错误描述和正常的返回值成为可能。 一般来说,错误的类型为error,这是一个简单的内置接口:

1
2
3
type error interface {
Error() string
}
我们可以随意实现这个接口,使其变成一个更丰富的模型,这样,除了错误之外,还可以提供一些上下文信息。例如:
1
2
3
4
5
6
7
8
9
type PathError struct {
Op string
Path string
Err error
}

func (e *PathError) Error() string {
return e.Op + " " + e.Path + ": " + e.Err.Error()
}
当可行时,错误字符串应该标识其来源,例如,带一个命名产生该错误的操作或者包的前缀。

关心准确错误细节的调用者可以使用类型选择或者类型断言来查找特定的错误和准确的细节。例如:

1
2
3
4
5
6
7
file, err = os.Create(filename)
if err == nil {
return
}
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
// do something
}
#### panic 向调用者报告错误的常见方式是将错误作为一个额外的返回值返回。但是,如果该错误是不可修复的呢?此时程序往往会无法运行下去。

出于这个原因,内置函数panic实际上创建了一个运行时错误,这个错误将会停止程序。该函数接收一个任意类型的参数(通常是一个字符串),这个参数会在程序挂掉的时候被打印出来。这也是一种暗示某些不可能的事情发生了的方式,例如存在死循环。

对于真实的库函数来说,应该避免panic。如果错误可以被掩盖或解决,那么相比把整个程序搞挂而言,让程序继续运行会更好些。一个可能的反例是在初始化期间:如果库真的没法设置自己,那么panic是合理的。

1
2
3
4
5
6
7
8
var user = os.Getenv("USER")

func init(){
if user == "" {
panic("no value for $USER")
}
}
}
#### recover 当panic被调用的时候(包括运行时错误的隐式调用),会立即停止当前函数的执行,然后开始沿着goroutine的堆栈展开往上退,找到被defer的函数并运行。如果退到了该goroutine栈的顶层,那么,程序就挂掉了。

但是,我们可以使用内置的recover来重新获得此goroutine的控制权,然后恢复正常的执行。

recover的调用停止堆栈展开,然后返回传给panic的参数。由于在堆栈展开过长中唯一运行的代码位于被defer的函数内,所以,recover只有在被defer的函数内才有用。

recover的应用之一是在不会杀死其他正在执行的goroutine的情况下,关闭服务器内一个失败的goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}

func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Println("work failed:", err)
}
}()
do(work) // 因为用了recover,所以这个函数可以通过调用panic来干净地摆脱任何困境。
}
除非直接从一个被defer的函数内调用,否则recover总是返回nil。因此,被defer的代码可以正常地调用使用了panicrecover的库程序。

参考

请言小午吃个甜筒~~