标签 runtime 下的文章

Go GC如何检测内存对象中是否包含指针

本文永久链接 – https://tonybai.com/2022/02/21/how-gc-detect-pointer-in-mem-obj

众所周知,Go是带垃圾回收(GC)的编程语言,开发者通常不需要考虑对内存的管理,降低了心智负担。Go程序运行的时候,GC在背后默默辛劳地为开发者“擦屁股”:把无法reach到的内存对象定期地释放掉以备后续重用。

GC只关心指针,只要被扫描到的内存对象中有指针,它就会“顺藤摸瓜”,把该内存对象所在的“关系网”摸个门儿清,而那些被孤立在这张“网”之外的内存对象就是要被“清扫”的对象。

那么GC在扫描时如何判断某个内存对象中是否有指针呢?这篇文章我们就来说说这事儿!

内存对象中有指针与无指针的差别

Gopher Academy Blog 2018年发表的一篇文章《Avoiding high GC overhead with large heaps》中作者曾用两个例子来对比了内存对象中有指针与无指针时GC的行为差别。我们摘录一下其中的这两个例子,第一个例子如下:

// demo1.go
func main() {
    a := make([]*int, 1e9) 

    for i := 0; i < 10; i++ {
        start := time.Now()
        runtime.GC()
        fmt.Printf("GC took %s\n", time.Since(start))
    }

    runtime.KeepAlive(a)
}

程序中调用runtime.KeepAlive函数用于保证在该函数调用点之前切片a不会被GC释放掉。

我们看到:demo1中声明了一个包含10亿个*int的切片变量a,然后调用runtime.GC函数手工触发GC过程,并度量每次GC的执行时间,我们看看这个程序的执行结果(virtualbox 虚拟机ubuntu 20.04/go 1.18beta2):

$ go run demo1.go
GC took 698.46522ms
GC took 325.315425ms
GC took 321.959991ms
GC took 326.775531ms
GC took 333.949713ms
GC took 332.350721ms
GC took 328.1664ms
GC took 329.905988ms
GC took 328.466344ms
GC took 330.327066ms

我们看到,每轮GC调用都相当耗时。我们再来看第二个例子:

// demo2.go
func main() {
    a := make([]int, 1e9) 

    for i := 0; i < 10; i++ {
        start := time.Now()
        runtime.GC()
        fmt.Printf("GC took %s\n", time.Since(start))
    }

    runtime.KeepAlive(a)
}

这个例子仅是将切片的元素类型由*int改为了int。我们运行一下这第二个例子:

$ go run demo2.go
GC took 3.486008ms
GC took 1.678019ms
GC took 1.726516ms
GC took 1.13208ms
GC took 1.900233ms
GC took 1.561631ms
GC took 1.899654ms
GC took 7.302686ms
GC took 131.371494ms
GC took 1.138688ms

在我们的实验环境中demo2中每轮GC的性能是demo1的300多倍!两个demo源码唯一的不同就是切片中的元素类型,demo1中的切片元素类型为int型指针。GC每次触发后都会全量扫描切片中存储的这10亿个指针,这就是demo1 GC函数执行时间很长的原因。而demo2中的切片元素类型为int,从demo2的运行结果来看,GC根本没有搭理demo2中的a,这也是demo2 GC函数执行时间较短的原因(我测试了一下:在我的环境中,即便不声明切片a,只是执行10次runtime.GC函数,该函数的平均执行时间也在1ms左右)。

通过以上GC行为差异,我们知道GC可以通过切片a的类型知晓其元素是否包含指针,进而决定是否对其进行进一步扫描。下面我们就来看看GC是如何检测到某一个内存对象中包含指针的。

运行时类型信息(rtype)

Go是静态语言,每个变量都有自己的归属的类型,当变量被在堆上分配时,堆上的内存对象也就有了自己归属的类型。Go编译器在编译阶段就为Go应用中的每种类型建立了对应的类型信息,这些信息体现在runtime._rtype结构体中,Go reflect包的rtype结构体等价于runtime._rtype:

// $GOROOT/src/reflect/type.go

// rtype is the common implementation of most values.
// It is embedded in other struct types.
//
// rtype must be kept in sync with ../runtime/type.go:/^type._type.
type rtype struct {
    size       uintptr
    ptrdata    uintptr // number of bytes in the type that can contain pointers
    hash       uint32  // hash of type; avoids computation in hash tables
    tflag      tflag   // extra type information flags
    align      uint8   // alignment of variable with this type
    fieldAlign uint8   // alignment of struct field with this type
    kind       uint8   // enumeration for C
    // function for comparing objects of this type
    // (ptr to object A, ptr to object B) -> ==?
    equal     func(unsafe.Pointer, unsafe.Pointer) bool
    gcdata    *byte   // garbage collection data
    str       nameOff // string form
    ptrToThis typeOff // type for pointer to this type, may be zero
}

在这个结构体类型中的gcdata字段是为GC服务的,我们看看它究竟是什么!怎么看呢?由于reflect.rtype类型是非导出类型,我们需要对本地的Go语言源码做一些hack,我在reflect包的type.go文件中rtype结构体的定义之前添加一行代码:

type Rtype = rtype

我们用Go 1.9版本引入的类型别名(type alias)机制将rtype导出,这样我们就可以在标准库外面使用reflect.Rtype了。

有童鞋可能会问:改了本地Go标准库源码后,Go编译器就会使用最新源码来编译我们的Go示例程序么?Go 1.18之前的版本都不会!大家可以自行试验一下,也可以通过《Go语言精进之路vol1》第16条“理解包导入”一章了解有关于Go编译器构建过程的详尽描述。

下面我们来获取一个切片的类型对应的rtype,看看其中的gcdata究竟是啥?

// demo4.go

package main

import (
    "fmt"
    "reflect"
    "unsafe"
)

type tflag uint8
type nameOff int32 // offset to a name
type typeOff int32 // offset to an *rtype

type rtype struct {
    size       uintptr
    ptrdata    uintptr // number of bytes in the type that can contain pointers
    hash       uint32  // hash of type; avoids computation in hash tables
    tflag      tflag   // extra type information flags
    align      uint8   // alignment of variable with this type
    fieldAlign uint8   // alignment of struct field with this type
    kind       uint8   // enumeration for C
    // function for comparing objects of this type
    // (ptr to object A, ptr to object B) -> ==?
    equal     func(unsafe.Pointer, unsafe.Pointer) bool
    gcdata    *byte   // garbage collection data
    str       nameOff // string form
    ptrToThis typeOff // type for pointer to this type, may be zero
}

func bar() []*int {
    t := make([]*int, 8 )
    return t
}

func main() {
    t := bar()
    v := reflect.TypeOf(t)

    rtyp, ok := v.(*reflect.Rtype)
    if !ok {
        println("error")
        return
    }

    r := (*rtype)(unsafe.Pointer(rtyp))
    fmt.Printf("%#v\n", *r)
    fmt.Printf("*gcdata = %d\n", *(r.gcdata))
}

bar函数返回一个堆上分配的切片实例t,我们通过reflect.TypeOf获取t的类型信息,通过类型断言我们得到该类型的rtype信息:rtyp,不过gcdata也是非导出字段并且是一个指针,我们要想对其解引用,我们这里又在本地定义了一个本地rtype类型,用于输出gcdata指向的内存的值。

运行这个示例:

$go run demo4.go
main.rtype{size:0x18, ptrdata:0x8, hash:0xaad95941, tflag:0x2, align:0x8, fieldAlign:0x8, kind:0x17, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(nil), gcdata:(*uint8)(0x10c1b58), str:3526, ptrToThis:0}
*gcdata = 1

我们看到gcdata指向的一个字节的内存的值为1(二进制为0b00000001)。好了,不卖关子了!gcdata所指的这个字节每一bit上的值代表一个8字节的内存块是否包含指针。这样的一个字节就可以标识在一个64字节的内存块中,每个8字节的内存单元是否包含指针。如果类型长度超过64字节,那么用于表示指针地图的gcdata指向的有效字节个数也不止1个字节。

读过我的“Go语言第一课”专栏的童鞋都知道,切片类型在runtime层表示为下面结构:

// $GOROOT/src/runtime/slice.go

type slice struct {
    array unsafe.Pointer
    len   int
    cap   int
}

这里切片类型结构内存对齐后的size为24,小于64个字节,因此Go用一个字节就可以表示切片类型的指针地图。而*gcdata=1,即最低位上的bit为1,表示切片类型的第一个8字节中存储着一个指针。配合下面的示意图理解起来更easy一些:

我们也可以进一步查看切片中各元素是否包含指针,由于该切片的元素就是指针类型,所以每个元素的rtype.gcdata指向的bitmap的值都应该是1,我们来验证一下:

//demo5.go
... ...
func main() {
    t := bar()
    v := reflect.ValueOf(t)

    for i := 0; i < len(t); i++ {
        v1 := v.Index(i)
        vtyp := v1.Type()

        rtyp, ok := vtyp.(*reflect.Rtype)
        if !ok {
            println("error")
            return
        }

        r := (*rtype)(unsafe.Pointer(rtyp))
        fmt.Printf("%#v\n", *r)
        fmt.Printf("*gcdata = %d\n", *(r.gcdata))
    }
}

这个例子输出了每个切片元素的bitmap,结果如下:

$go run demo5.go

gomain.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1
main.rtype{size:0x8, ptrdata:0x8, hash:0x2522ebe7, tflag:0x8, align:0x8, fieldAlign:0x8, kind:0x36, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x1002c40), gcdata:(*uint8)(0x10c1be0), str:566, ptrToThis:0}
*gcdata = 1

输出结果与预期相符。

我们再来看一个例子,一个用单字节bitmap无法表示的类型:

// demo6.go
... ...
type S struct {  // 起始地址
    a  uint8     // 0
    b  uintptr   // 8
    p1 *uint8    // 16
    c  [3]uint64 // 24
    d  uint32    // 48
    p2 *uint64   // 56
    p3 *uint8    // 64
    e  uint32    // 72
    p4 *uint64   // 80
}

func foo() *S {
    t := new(S)
    return t
}

func main() {
    t := foo()
    println(unsafe.Sizeof(*t)) // 88
    typ := reflect.TypeOf(t)
    rtyp, ok := typ.Elem().(*reflect.Rtype)

    if !ok {
        println("error")
        return
    }
    fmt.Printf("%#v\n", *rtyp)

    r := (*rtype)(unsafe.Pointer(rtyp))
    fmt.Printf("%#v\n", *r)
    fmt.Printf("%d\n", *(r.gcdata))
    gcdata1 := (*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(r.gcdata)) + 1))
    fmt.Printf("%d\n", *gcdata1)
}

在这个例子中,我们定义了一个很大的结构体类型S,其size为88,用一个字节无法表示出其bitmap,于是Go使用了两个字节,我们输出这两个字节的bitmap:

$go run demo6.go
88
reflect.rtype{size:0x58, ptrdata:0x58, hash:0xcdb468b2, tflag:0x7, align:0x8, fieldAlign:0x8, kind:0x19, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x108aea0), gcdata:(*uint8)(0x10c135b), str:3593, ptrToThis:19168}
main.rtype{size:0x58, ptrdata:0x58, hash:0xcdb468b2, tflag:0x7, align:0x8, fieldAlign:0x8, kind:0x19, equal:(func(unsafe.Pointer, unsafe.Pointer) bool)(0x108aea0), gcdata:(*uint8)(0x10c135b), str:3593, ptrToThis:19168}
132
5

我们将结果转换成一幅示意图,如下图:

理解上面这个结构体size以及各字段起始地址的前提是理解内存对齐,这个大家可以在我的博客内搜索以前撰写的有关内存对齐的相关内容,当然也可以参考我在专栏第17讲讲解结构体类型时对Go内存对齐的系统讲解。


img{512x368}

img{512x368}

img{512x368}
img{512x368}

我爱发短信:企业级短信平台定制开发专家 https://tonybai.com/。smspush : 可部署在企业内部的定制化短信平台,三网覆盖,不惧大并发接入,可定制扩展; 短信内容你来定,不再受约束, 接口丰富,支持长短信,签名可选。2020年4月8日,中国三大电信运营商联合发布《5G消息白皮书》,51短信平台也会全新升级到“51商用消息平台”,全面支持5G RCS消息。

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博:https://weibo.com/bigwhite20xx
  • 微信公众号:iamtonybai
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • “Gopher部落”知识星球:https://public.zsxq.com/groups/51284458844544

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

ants:在Submit中再调用当前Pool的Submit可能导致阻塞

本文永久链接 – https://tonybai.com/2021/11/27/ants-call-submit-in-submit-may-cause-blocking

1. goroutine pool的必要性

Go在并发程序方面的一个小创新就是支持轻量级用户线程goroutine,不过虽然goroutine很轻,但并不是免费的,尤其是Go程序中存在大量goroutine反复启停时(比如采用每连接一个goroutine的处理http短连接的http server,在大并发的情况下就是如此),Go运行时启停和调度goroutine的开销还是蛮大的。这个时候我们对goroutine pool的需求就诞生了。

goroutine pool减小开销的主要思路就是复用:即创建出的goroutine在做完一个task后不退出,而是等待下一个task,这样来减少goroutine反复创建和销毁带来的开销。除此之外,由于goroutine已经被创建,当任务到达时,可以不需要等待goroutine创建就能立即执行,提高响应速度。并且通过goroutine pool,我们还可以严格控制启动的goroutine的数量,避免因外部条件变化带来的goroutine数量的暴涨与暴跌。

在Go社区中,优秀的goroutine pool的实现有不少,Andy Pan开源的ants就是其中之一。根据ants在github上的当前状态来看,它在Go社区范围的应用很广泛,Andy Pan对issue的响应也是十分快的。这也是我们在项目中引入ants的原因。

这篇文章要写的就是我们在使用ants过程中遇到的问题,以及对问题的简单分析与解决过程,这里分享出来的目的也是希望大家能避免遇到同类问题。

2. 问题描述

我们在对系统进行压测时,发现系统出现了“死锁”。经过查找,我们将问题锁定在对ants包的使用上面了。我们的工程师使用ants时,在传给Pool.Submit方法的task函数中又调用了同一个Pool的Submit方法。之后他便用下面代码复现了这个问题:

package main

import (
    "fmt"
    "time"

    "github.com/panjf2000/ants/v2"
)

func main() {
    p, _ := ants.NewPool(100)

    for {
        p.Submit(func() {
            for i := 0; i < 3; i++ {
                p.Submit(func() {
                    fmt.Println(time.Now().Unix())
                })
            }
        })
    }
}

这个代码使用了ants 2.4.6版本,我们在ubuntu 20.04上使用Go 1.17运行这个程序,很快程序就锁住了。

3. 原因分析

ants代码不多,原理上也不复杂,我们直接来看看Submit的代码:

// https://github.com/panjf2000/ants/blob/master/pool.go (commit fdb318c1d7cef8e448f1bc2bbb03519ff69939da)
func (p *Pool) Submit(task func()) error {
    if p.IsClosed() {
        return ErrPoolClosed
    }
    var w *goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    w.task <- task
    return nil
}

我们看到,Submit方法的主要逻辑就是从Pool中获取一个worker,然后将传入的task写入worker的task channel中。再来看看retrieveWorker方法:

// https://github.com/panjf2000/ants/blob/master/pool.go(commit fdb318c1d7cef8e448f1bc2bbb03519ff69939da)

225 func (p *Pool) retrieveWorker() (w *goWorker) {
226     spawnWorker := func() {
227         w = p.workerCache.Get().(*goWorker)
228         w.run()
229     }
230
231     p.lock.Lock()
232
233     w = p.workers.detach()
234     if w != nil { // first try to fetch the worker from the queue
235         p.lock.Unlock()
236     } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
237         // if the worker queue is empty and we don't run out of the pool capacity,
238         // then just spawn a new worker goroutine.
239         p.lock.Unlock()
240         spawnWorker()
241     } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
242         if p.options.Nonblocking {
243             p.lock.Unlock()
244             return
245         }
246     retry:
247         if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
248             p.lock.Unlock()
249             return
250         }
251         p.blockingNum++
252         p.cond.Wait() // block and wait for an available worker
253         p.blockingNum--
254         var nw int
255         if nw = p.Running(); nw == 0 { // awakened by the scavenger
256             p.lock.Unlock()
257             if !p.IsClosed() {
258                 spawnWorker()
259             }
260             return
261         }
262         if w = p.workers.detach(); w == nil {
263             if nw < capacity {
264                 p.lock.Unlock()
265                 spawnWorker()
266                 return
267             }
268             goto retry
269         }
270
271         p.lock.Unlock()
272     }
273     return
274 }

retrieveWorker方法负责从Pool中取出一个空闲worker。

retrieveWorker先加锁(line 231),然后尝试从worker queue中获取空闲worker(line 233),如果成功获得,那么解锁返回(line 234~235);

如果队列为空,且池子容量(capacity)还没有满,那就创建一个新worker(line 236~240);

如果队列为空,且池子容量(capacity)也满了(line 241),那么判断一下p.options.Nonblocking是否为true,如果为true,说明不想阻塞,那么retrieveWorker返回nil(line 247~250)。retrieveWorker返回nil,那么Submit返回ErrPoolOverload错误。

如果用户没有将p.options.Nonblocking设置为true(p.options.Nonblocking默认为false),retrieveWorker判断p.options.MaxBlockingTasks这个option,但p.options.MaxBlockingTasks这个option默认为0,所以不满足条件。代码进入p.cond.Wait(),问题就出在这里

我们简化一下复现的步骤,假设我们的pool的容量是1,初始我们调用1次Submit获得了worker,这个worker开始执行task,而这个被执行的task又调用了同一个Pool的Submit,之后进入retrieveWorker方法,由于没有设置p.options.Nonblocking=true,cap容量也满了,由于此时没有空闲worker了,于是该worker进入p.cond.Wait。此时程序便进入死锁状态。将这个示例整理为代码,如下:

package main

import (
    "fmt"
    "time"

    "github.com/panjf2000/ants/v2"
)

func main() {
    p, _ := ants.NewPool(1)

    p.Submit(func() {
        p.Submit(func() {
            fmt.Println(time.Now().Unix())
        })
    })

    time.Sleep(1000 *time.Second)
}

大家可以执行一下这段代码,死锁必然马上出现。

如果我们修改一下ants的pool.go中的代码,在p.cond.Wait()前后加入一些打印语句,就像下面这样:

p.blockingNum++
fmt.Println("==== cond wait ...===")
p.cond.Wait() // block and wait for an available worker
fmt.Println("==== cond wait return ===")
p.blockingNum--

然后,我们通过replace将demo对ants的依赖改为本地依赖,运行demo后,我们将看到下面输出:

==== cond wait ...===

demo将一直停在上面这行输出的地方不再向下执行了。

4. 官方策略

我将这个问题提交到ants的issue列表中,Andy Pan很快给了响应。按照Andy的说法,目前ants并不禁止Submit()里再调用同一个Pool的Submit(),只是需要设置一下Pool无可用worker时不阻塞即可,就像下面代码这样:

p, _ := ants.NewPool(1, ants.WithNonblocking(true))

我个人又考虑了一下这个问题,设置WithNonblocking为true,Submit方法会返回ErrPoolOverload错误,那么调用者需要考虑如何处理这个错误,最大的可能就是反复重试。

另外如果不设置ants.WithNonblocking(true),我就是要让代码去等,正常情况下,这种阻塞应该是可以解开的,当task执行完毕后,自然可以空闲出一个goroutine来接新task。但问题就在于:如果我在Submit()里再调用同一个Pool的Submit(),一旦所有task都是这种情况,这个阻塞可能是无法解开的。所以我建议Andy在文档中说明一下这种情况。Andy也接受了这个建议,在最新的commit中在Submit和Invoke方法的注释中增加了对这种情况的说明。

5. 解决方法

那么如果我就是要在Submit中调用Submit该如何处理呢?一种很直接的思路就是使用两个Pool!比如将上面的demo改成下面这样就可以正常运行了:

func main() {
    p1, _ := ants.NewPool(1)
    p2, _ := ants.NewPool(1)

    p1.Submit(func() {
        p2.Submit(func() {
            fmt.Println(time.Now().Unix())
        })
    })

    time.Sleep(10*time.Second)
}

6. 补充一个因上述ants阻塞问题导致的其他问题

我们的系统在生产场景中会有大量并发连接,针对每个连接都会有定时器处理会话相关的过期、删除等。考虑到定时器太多,我们选择了维护定时器开销更小的时间轮算法的定时器实现。在github上,RussellLuo/timingwheel目前star最多的,但美中不足的是其作者Russelluo似乎对这一项目不是很热心了,issue响应也很少了。我们抱着先使用再自主改进的态度引入了RussellLuo/timingwheel。

考虑到RussellLuo/timingwheel每执行一个fired的timer对应的task时,都启动一个新goroutine去执行,我们将下面代码做了修改:

func (tw *TimingWheel) addOrRun(t *Timer) {
    if !tw.add(t) {
        // Already expired

        // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),
        // always execute the timer's task in its own goroutine.
        go t.task()
    }
}

改为:

func (tw *TimingWheel) addOrRun(t *Timer) {
    if !tw.add(t) {
        // Already expired

        // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),
        // always execute the timer's task in its own goroutine.
        tw.workerPool.Submit(func() {
            t.task()
        })
    }
}

我们用一个ants pool(pool size默认为1024)来减少goroutine频繁创建销毁带来的开销。

在开发与功能测试阶段,改造后的RussellLuo/timingwheel表现不错,一切都还ok。进入到压测阶段,我们发现,在大量连接一起断连后,大部分新启动的用于清除会话的定时器都无法工作,时间到了后,timer也不fire,导致我们的连接断连逻辑无法执行。我用下面的例子复现了这个问题(为了方便复现现象,我们把ants的Pool size改为1):

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/RussellLuo/timingwheel"
)

var tw *timingwheel.TimingWheel

type tickScheduler struct {
    interval time.Duration
}

func (s *tickScheduler) Next(prev time.Time) time.Time {
    next := prev.Add(s.interval)
    return next
}

type Timer struct {
    timer *timingwheel.Timer
}

func (t *Timer) Stop() bool {
    return t.timer.Stop()
}

func TickFunc(d time.Duration, f func()) *Timer {
    s := &tickScheduler{
        interval: d,
    }
    t := tw.ScheduleFunc(s, f)
    return &Timer{t}
}

func main() {
    tw = timingwheel.NewTimingWheel(10*time.Millisecond, 60)
    tw.Start()
    defer tw.Stop()

    var c = make(chan string)
    var wg sync.WaitGroup
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func() {
            timer := TickFunc(time.Millisecond*10, func() {
                c <- "timer fired"
            })
            defer timer.Stop()

            time.Sleep(time.Second)

            for i := 0; i < 10; i++ {
                s := <-c
                if s != "timer fired" {
                    fmt.Errorf("%d: want [timer fired], got [%s]\n", i+1, s)
                } else {
                    fmt.Printf("%d: timer fired\n", i+1)
                }
            }
            wg.Done()
        }()
    }

    wg.Wait()
}

运行这个程序,程序也很快锁住:

$ go run main.go
1: timer fired
1: timer fired
1: timer fired
1: timer fired
1: timer fired
2: timer fired
2: timer fired
2: timer fired
2: timer fired
//锁住

这个问题与本文开始的问题一样,也是在Submit中调用同pool的Submit,调用Submit的两处位置,我在下面的代码中用注释标记了出来。

func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) (t *Timer) {
    expiration := s.Next(time.Now().UTC())
    if expiration.IsZero() {
        // No time is scheduled, return nil.
        return
    }   

    t = &Timer{
        expiration: timeToMs(expiration),
        task: func() {
            // Schedule the task to execute at the next time if possible.
            expiration := s.Next(msToTime(t.expiration))
            if !expiration.IsZero() {
                t.expiration = timeToMs(expiration)
                tw.addOrRun(t)  // 如果timer已经fire,那么就调用pool.Submit
            }   

            // Actually execute the task.
            f()
        },
    }
    tw.addOrRun(t) // 如果timer已经fire,那么就调用pool.Submit

    return
}

btw,关于时间轮算法是否在资源占用,维护timer开销方面胜过Go标准库timer,这里其实并没有细致比对过。Go标准库的timer性能一直在完善,后续有时间需要认真对比一下。


“Gopher部落”知识星球正式转正(从试运营星球变成了正式星球)!“gopher部落”旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!部落目前虽小,但持续力很强,欢迎大家加入!

img{512x368}

img{512x368}
img{512x368}
img{512x368}

我爱发短信:企业级短信平台定制开发专家 https://tonybai.com/。smspush : 可部署在企业内部的定制化短信平台,三网覆盖,不惧大并发接入,可定制扩展; 短信内容你来定,不再受约束, 接口丰富,支持长短信,签名可选。2020年4月8日,中国三大电信运营商联合发布《5G消息白皮书》,51短信平台也会全新升级到“51商用消息平台”,全面支持5G RCS消息。

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博:https://weibo.com/bigwhite20xx
  • 微信公众号:iamtonybai
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • “Gopher部落”知识星球:https://public.zsxq.com/groups/51284458844544

微信赞赏:
img{512x368}

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats