分类 技术志 下的文章

论golang Timer Reset方法使用的正确姿势

2016年,Go语言Tiobe编程语言排行榜上位次的大幅蹿升(2016年12月份Tiobe榜单:go位列第16位,Rating值:1.939%)。与此同时,我们也能切身感受到Go语言在世界范围蓬勃发展,其在中国地界儿上的发展更是尤为猛烈^0^:For gopher们的job变多了、网上关于Go的资料也大有“汗牛充栋”之势。作为职业Gopher^0^,要为这个生态添砖加瓦,就要多思考、多总结,关键还要做到“遇到了问题,就要说出来,给出你的见解”。每篇文章都有自己的切入角度和关注重点,因此Gopher们也无需过于担忧资料的“重复”。

这次,我来说说在使用Go标准库中Timer的Reset方法时遇到的问题。

一、关于Timer原理的一些说明

网络编程方面,从用户视角看,golang表象上是一种“阻塞式”网络编程范式,而支撑这种“阻塞式”范式的则是内置于go编译后的executable file中的runtime。runtime利用网络IO多路复用机制实现多个进行网络通信的goroutine的合理调度。goroutine中的执行函数则相当于你在传统C编程中传给epoll机制的回调函数。golang一定层度上消除了在这方面“回调”这种“逆向思维”给你带来的心智负担,简化了网络编程的复杂性。

但长时间“阻塞”显然不能满足大多数业务情景,因此还需要一定的超时机制。比如:在socket层面,我们通过显式设置net.Dialer的Timeout或使用SetReadDeadline、SetWriteDeadline以及SetDeadline;在应用层协议,比如http,client通过设置timeout参数,server通过TimeoutHandler来限制操作的time limit。这些timeout机制,有些是通过runtime的网络多路复用的timeout机制实现,有些则是通过Timer实现的。

标准库中的Timer让用户可以定义自己的超时逻辑,尤其是在应对select处理多个channel的超时、单channel读写的超时等情形时尤为方便。

1、Timer的创建

Timer是一次性的时间触发事件,这点与Ticker不同,后者则是按一定时间间隔持续触发时间事件。Timer常见的使用场景如下:

场景1:

t := time.AfterFunc(d, f)

场景2:

select {
    case m := <-c:
       handle(m)
    case <-time.After(5 * time.Minute):
       fmt.Println("timed out")
}

或:
t := time.NewTimer(5 * time.Minute)
select {
    case m := <-c:
       handle(m)
    case <-t.C:
       fmt.Println("timed out")
}

从这两个场景中,我们可以看到Timer三种创建姿势:

t:= time.NewTimer(d)
t:= time.AfterFunc(d, f)
c:= time.After(d)

虽然姿势不同,但背后的原理则是相通的。

Timer有三个要素:

* 定时时间:也就是那个d
* 触发动作:也就是那个f
* 时间channel: 也就是t.C

对于AfterFunc这种创建方式而言,Timer就是在超时(timer expire)后,执行函数f,此种情况下:时间channel无用。

//$GOROOT/src/time/sleep.go

func AfterFunc(d Duration, f func()) *Timer {
    t := &Timer{
        r: runtimeTimer{
            when: when(d),
            f:    goFunc,
            arg:  f,
        },
    }
    startTimer(&t.r)
    return t
}

func goFunc(arg interface{}, seq uintptr) {
    go arg.(func())()
}

注意:从AfterFunc源码可以看到,外面传入的f参数并非直接赋值给了内部的f,而是作为wrapper function:goFunc的arg传入的。而goFunc则是启动了一个新的goroutine来执行那个外部传入的f。这是因为timer expire对应的事件处理函数的执行是在go runtime内唯一的timer events maintenance goroutine: timerproc中。为了不block timerproc的执行,必须启动一个新的goroutine。

//$GOROOT/src/runtime/time.go
func timerproc() {
    timers.gp = getg()
    for {
        lock(&timers.lock)
        ... ...
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&timers.lock)
        }
        ... ...
        unlock(&timers.lock)
   }
}

而对于NewTimer和After这两种创建方法,则是Timer在超时(timer expire)后,执行一个标准库中内置的函数:sendTime。sendTime将当前当前事件send到timer的时间Channel中,那么说这个动作不会阻塞到timerproc的执行么?答案肯定是不会的,其原因就在下面代码中:

//$GOROOT/src/time/sleep.go
func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        ... ...
    }
    ... ...
    return t
}

func sendTime(c interface{}, seq uintptr) {
    // Non-blocking send of time on c.
    // Used in NewTimer, it cannot block anyway (buffer).
    // Used in NewTicker, dropping sends on the floor is
    // the desired behavior when the reader gets behind,
    // because the sends are periodic.
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

我们看到NewTimer中创建了一个buffered channel,size = 1。正常情况下,当timer expire,t.C无论是否有goroutine在read,sendTime都可以non-block的将当前时间发送到C中;同时,我们看到sendTime还加了双保险:通过一个select判断c buffer是否已满,一旦满了,直接退出,依然不会block,这种情况在reuse active timer时可能会遇到。

2、Timer的资源释放

很多Go初学者在使用Timer时都会担忧Timer的创建会占用系统资源,比如:

有人会认为:创建一个Timer后,runtime会创建一个单独的Goroutine去计时并在expire后发送当前时间到channel里。
还有人认为:创建一个timer后,runtime会申请一个os级别的定时器资源去完成计时工作。

实际情况并不是这样。恰好近期gopheracademy blog发布了一篇 《How Do They Do It: Timers in Go》,通过对timer源码的分析,讲述了timer的原理,大家可以看看。

go runtime实际上仅仅是启动了一个单独的goroutine,运行timerproc函数,维护了一个”最小堆”,定期wake up后,读取堆顶的timer,执行timer对应的f函数,并移除该timer element。创建一个Timer实则就是在这个最小堆中添加一个element,Stop一个timer,则是从堆中删除对应的element。

同时,从上面的两个Timer常见的使用场景中代码来看,我们并没有显式的去释放什么。从上一节我们可以看到,Timer在创建后可能占用的资源还包括:

  • 0或一个Channel
  • 0或一个Goroutine

这些资源都会在timer使用后被GC回收。

综上,作为Timer的使用者,我们要做的就是尽量减少在使用Timer时对最小堆管理goroutine和GC的压力即可,即:及时调用timer的Stop方法从最小堆删除timer element(如果timer 没有expire)以及reuse active timer。

BTW,这里还有一篇讨论go Timer精度的文章,大家可以拜读一下。

二、Reset到底存在什么问题?

铺垫了这么多,主要还是为了说明Reset的使用问题。什么问题呢?我们来看下面的例子。这些例子主要是为了说明Reset问题,现实中很可能大家都不这么写代码逻辑。当前环境:go version go1.7 darwin/amd64。

1、example1

我们的第一个example如下:

//example1.go

func main() {
    c := make(chan bool)

    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second * 1)
            c <- false
        }

        time.Sleep(time.Second * 1)
        c <- true
    }()

    go func() {
        for {
            // try to read from channel, block at most 5s.
            // if timeout, print time event and go on loop.
            // if read a message which is not the type we want(we want true, not false),
            // retry to read.
            timer := time.NewTimer(time.Second * 5)
            defer timer.Stop()
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

    //to avoid that all goroutine blocks.
    var s string
    fmt.Scanln(&s)
}

example1.go的逻辑大致就是 一个consumer goroutine试图从一个channel里读出true,如果读出false或timer expire,那么继续try to read from the channel。这里我们每次循环都创建一个timer,并在go routine结束后Stop该timer。另外一个producer goroutine则负责生产消息,并发送到channel中。consumer中实际发生的行为取决于producer goroutine的发送行为。

example1.go执行的结果如下:

$go run example1.go
2016-12-21 14:52:18.657711862 +0800 CST :recv false. continue
2016-12-21 14:52:19.659328152 +0800 CST :recv false. continue
2016-12-21 14:52:20.661031612 +0800 CST :recv false. continue
2016-12-21 14:52:21.662696502 +0800 CST :recv false. continue
2016-12-21 14:52:22.663531677 +0800 CST :recv false. continue
2016-12-21 14:52:23.665210387 +0800 CST :recv true. return

输出如预期。但在这个过程中,我们新创建了6个Timer。

2、example2

如果我们不想重复创建这么多Timer实例,而是reuse现有的Timer实例,那么我们就要用到Timer的Reset方法,见下面example2.go,考虑篇幅,这里仅列出consumer routine代码,其他保持不变:

//example2.go
.... ...
// consumer routine
    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer is active , not fired, stop always returns true, no problems occurs.
            if !timer.Stop() {
                <-timer.C
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()
... ...

按照go 1.7 doc中关于Reset使用的建议:

To reuse an active timer, always call its Stop method first and—if it had expired—drain the value from its channel. For example:

if !t.Stop() {
        <-t.C
}
t.Reset(d)

我们改造了example1,形成example2的代码。由于producer行为并未变更,实际example2执行时,每次循环Timer在被Reset之前都没有expire,也没有fire a time to channel,因此timer.Stop的调用均返回true,即成功将timer从“最小堆”中移除。example2的执行结果如下:

$go run example2.go
2016-12-21 15:10:54.257733597 +0800 CST :recv false. continue
2016-12-21 15:10:55.259349877 +0800 CST :recv false. continue
2016-12-21 15:10:56.261039127 +0800 CST :recv false. continue
2016-12-21 15:10:57.262770422 +0800 CST :recv false. continue
2016-12-21 15:10:58.264534647 +0800 CST :recv false. continue
2016-12-21 15:10:59.265680422 +0800 CST :recv true. return

和example1并无二致。

3、example3

现在producer routine的发送行为发生了变更:从以前每隔1s发送一次数据变成了每隔7s发送一次数据,而consumer routine不变:

//example3.go

//producer routine
    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Second * 7)
            c <- false
        }

        time.Sleep(time.Second * 7)
        c <- true
    }()

我们来看看example3.go的执行结果:

$go run example3.go
2016-12-21 15:14:32.764410922 +0800 CST :timer expired

程序hang住了。你能猜到在哪里hang住的吗?对,就是在drain t.C的时候hang住了:

           // timer may be not active and may not fired
            if !timer.Stop() {
                <-timer.C //drain from the channel
            }
            timer.Reset(time.Second * 5)

producer的发送行为发生了变化,Comsumer routine在收到第一个数据前有了一次time expire的事件,for loop回到loop的开始端。这时timer.Stop函数返回的不再是true,而是false,因为timer已经expire,最小堆中已经不包含该timer了,Stop在最小堆中找不到该timer,返回false。于是example3代码尝试抽干(drain)timer.C中的数据。但timer.C中此时并没有数据,于是routine block在channel recv上了。

在Go 1.8以前版本中,很多人遇到了类似的问题,并提出issue,比如:

time: Timer.Reset is not possible to use correctly #14038

不过go team认为这还是文档中对Reset的使用描述不够充分导致的,于是在Go 1.8中对Reset方法的文档做了补充Go 1.8 beta2中Reset方法的文档改为了:

Resetting a timer must take care not to race with the send into t.C that happens when the current timer expires. If a program has already received a value from t.C, the timer is known to have expired, and t.Reset can be used directly. If a program has not yet received a value from t.C, however, the timer must be stopped and—if Stop reports that the timer expired before being stopped—the channel explicitly drained:

if !t.Stop() {
        <-t.C
}
t.Reset(d)

大致意思是:如果明确time已经expired,并且t.C已经被取空,那么可以直接使用Reset;如果程序之前没有从t.C中读取过值,这时需要首先调用Stop(),如果返回true,说明timer还没有expire,stop成功删除timer,可直接reset;如果返回false,说明stop前已经expire,需要显式drain channel。

4、example4

我们的example3就是“time已经expired,并且t.C已经被取空,那么可以直接使用Reset ”这第一种情况,我们应该直接reset,而不用显式drain channel。如何将这两种情形合二为一,很直接的想法就是增加一个开关变量isChannelDrained,标识timer.C是否已经被取空,如果取空,则直接调用Reset。如果没有,则drain Channel。

增加一个变量总是麻烦的,RussCox也给出一个未经详尽验证的方法,我们来看看用这种方法改造的example4.go:

//example4.go

//consumer
    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer may be not active, and fired
            if !timer.Stop() {
                select {
                case <-timer.C: //try to drain from the channel
                default:
                }
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

执行结果:

$go run example4.go
2016-12-21 15:38:16.704647957 +0800 CST :timer expired
2016-12-21 15:38:18.703107177 +0800 CST :recv false. continue
2016-12-21 15:38:23.706665507 +0800 CST :timer expired
2016-12-21 15:38:25.705314522 +0800 CST :recv false. continue
2016-12-21 15:38:30.70900638 +0800 CST :timer expired
2016-12-21 15:38:32.707482917 +0800 CST :recv false. continue
2016-12-21 15:38:37.711260142 +0800 CST :timer expired
2016-12-21 15:38:39.709668705 +0800 CST :recv false. continue
2016-12-21 15:38:44.71337522 +0800 CST :timer expired
2016-12-21 15:38:46.710880007 +0800 CST :recv false. continue
2016-12-21 15:38:51.713813305 +0800 CST :timer expired
2016-12-21 15:38:53.713063822 +0800 CST :recv true. return

我们利用一个select来包裹channel drain,这样无论channel中是否有数据,drain都不会阻塞住。看似问题解决了。

5、竞争条件

如果你看过timerproc的代码,你会发现其中的这样一段代码:

// go1.7
// $GOROOT/src/runtime/time.go
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&timers.lock)

我们看到在timerproc执行f(arg, seq)这个函数前,timerproc unlock了timers.lock,也就是说f的执行并没有在锁内。

前面说过,f的执行是什么?

对于AfterFunc来说,就是启动一个goroutine,并在这个新goroutine中执行用户传入的函数;
对于After和NewTimer这种创建姿势创建的timer而言,f的执行就是sendTime的执行,也就是向t.C中send 当前时间。

注意:这时候timer expire过程中sendTime的执行与“drain channel”是分别在两个goroutine中执行的,谁先谁后,完全依靠runtime调度。于是example4.go中的看似没有问题的代码,也可能存在问题(当然需要时间粒度足够小,比如ms级的Timer)。

如果sendTime的执行发生在drain channel执行前,那么就是example4.go中的执行结果:Stop返回false(因为timer已经expire了),显式drain channel会将数据读出,后续Reset后,timer正常执行;
如果sendTime的执行发生在drain channel执行后,那么问题就来了,虽然Stop返回false(因为timer已经expire),但drain channel并没有读出任何数据。之后,sendTime将数据发到channel中。timer Reset后的Timer中的Channel实际上已经有了数据,于是当进入下面的select执行体时,”case <-timer.C:”瞬间返回,触发了timer事件,没有启动超时等待的作用。

这也是issue:*time: Timer.C can still trigger even after Timer.Reset is called #11513中问到的问题。

go官方文档中对此也有描述:

Note that it is not possible to use Reset's return value correctly, as there is a race condition between draining the channel and the new timer expiring. Reset should always be invoked on stopped or expired channels, as described above. The return value exists to preserve compatibility with existing programs.

三、真的有Reset方法的正确使用姿势吗?

综合上述例子和分析,Reset的使用似乎没有理想的方案,但一般来说,在特定业务逻辑下,Reset还是可以正常工作的,就如example4那样。即便出现问题,如果了解了Reset背后的原理,问题解决起来也是会很快很准的。

文中的相关代码可以在这里下载

四、参考资料

Golang官方有关Timer的issue list:

runtime: special case timer channels #8898
time:timer stop ,how to use? #14947
time: document proper usage of Timer.Stop #14383
*time: Timer.Reset is not possible to use correctly #14038
Time.After doesn’t release memory #15781
runtime: timerproc does not get to run under load #15706
time: time.After uses memory until duration times out #15698
time:timer stop panic #14946
*time: Timer.C can still trigger even after Timer.Reset is called #11513
time: Timer.Stop documentation incorrect for Timer returned by AfterFunc #17600

相关资料:

  1. go中的定时器timer
  2. Go内部实现之timer
  3. Go定时器
  4. How Do They Do It: Timers in Go
  5. timer在go可以有多精确

使用wukong全文搜索引擎

近期项目中有一个全文索引和全文搜索的业务需求,组内同事在这方面都没啥经验,找一个满足我们需求的开源的全文搜索引擎势在必行。我们这一期对全文搜索引擎的需求并不复杂,最主要的是引擎可以很好的支持中文分词、索引和搜索,并能快速实现功能。在全文搜索领域,基于Apache luceneElasticSearch舍我其谁,其强大的分布式系统能力、对超大规模数据的支持、友好的Restful API以及近实时的搜索性能都是业内翘楚,并且其开发社区也是相当活跃,资料众多。但也正式由于其体量较大,我们并没有在本期项目中选择使用ElasticSearch,而是挑选了另外一个“fame”不是那么响亮的引擎:wukong

一、wukong简介

wukong,是一款golang实现的高性能、支持中文分词的全文搜索引擎。我个人觉得它最大的特点恰恰是不像ElasticSearch那样庞大和功能完备,而是可以以一个Library的形式快速集成到你的应用或服务中去,这可能也是在当前阶段选择它的最重要原因,当然其golang技术栈也是让我垂涎于它的另外一个原因:)。

第一次知道wukong,其实是在今年的GopherChina大会上,其作者陈辉作为第一个演讲嘉宾在大会上分享了“Go与人工智能”。在这个presentation中,chen hui详细讲解了wukong搜索引擎以及其他几个关联的开源项目,比如:sego等。

在golang世界中,做full text search的可不止wukong一个。另外一个比较知名的是bleve,但默认情况下,bleve并不支持中文分词和搜索,需要结合中文分词插件才能支持,比如:gojieba

wukong基本上是陈辉一个人打造的项目,在陈辉在阿里任职期间,他将其用于阿里内部的一些项目中,但总体来说,wukong的应用还是很小众的,相关资料也不多,基本都集中在其github站点上。关于wukong源码的分析,倒是在国外站点上发现一篇:《Code reading: wukong full-text search engine》。

本文更多聚焦于应用wukong引擎,而不是来分析wukong代码。

二、全文索引和检索

1、最简单的例子

我们先来看一个使用wukong引擎编写的最简单的例子:

//example1.go

package main

import (
    "fmt"

    "github.com/huichen/wukong/engine"
    "github.com/huichen/wukong/types"
)

var (
    searcher = engine.Engine{}
    docId    uint64
)

const (
    text1 = `在苏黎世的FIFA颁奖典礼上,巴萨球星、阿根廷国家队队长梅西赢得了生涯第5个金球奖,继续创造足坛的新纪录`
    text2 = `12月6日,网上出现照片显示国产第五代战斗机歼-20的尾翼已经涂上五位数部队编号`
)

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        SegmenterDictionaries: "./dict/dictionary.txt",
        StopTokenFile:         "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text1}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text2}, false)

    searcher.FlushIndex()

    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "巴萨 梅西"}))
    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "战斗机 金球奖"}))
}

在这个例子中,我们创建的wukong engine索引了两个doc:text1和text2,建立好索引后,我们利用引擎进行关键词查询,我们来看看查询结果:

$go run example1.go
2016/12/06 21:40:04 载入sego词典 ./dict/dictionary.txt
2016/12/06 21:40:08 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

可以看出当查询“巴萨 梅西”时,引擎正确匹配到了第一个文档(DocId:0×1)。而第二次查询关键词组合“战斗机 金球奖”则没有匹配到任何文档。从这个例子我们也可以看出,wukong引擎对关键词查询支持的是关键词的AND查询,只有文档中同时包含所有关键词,才能被匹配到。这也是目前wukong引擎唯一支持的一种关键词搜索组合模式。

wukong引擎的索引key是一个uint64值,我们需要保证该值的唯一性,否则将导致已创建的索引被override。

另外我们看到:在初始化IndexerInitOptions时,我们传入的IndexType是types.DocIdsIndex,这将指示engine在建立的索引和搜索结果中只保留匹配到的DocId信息,这将最小化wukong引擎对内存的占用。

如果在初始化EngineInitOptions时不给StopTokenFile赋值,那么当我们搜索”巴萨 梅西”时,引擎会将keywords分成三个关键词:”巴萨”、空格和”梅西”分别搜索并Merge结果:

$go run example1.go
2016/12/06 21:57:47 载入sego词典 ./dict/dictionary.txt
2016/12/06 21:57:51 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", " ", "梅西"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}
types.SearchResponse{Tokens:[]string{"战斗机", " ", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

2、FrequenciesIndex和LocationsIndex

wukong Engine的IndexType支持的另外两个类型是FrequenciesIndex和LocationsIndex,分别对应的是保留词频信息以及关键词在文档中出现的位置信息,这两类IndexType对内存的消耗量也是逐渐增大的,毕竟保留的信息是递增的:

当IndexType = FrequenciesIndex时:

$go run example1.go
2016/12/06 22:03:47 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:03:51 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{3.0480049}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

当IndexType = LocationsIndex时:

$go run example1.go
2016/12/06 22:04:31 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:04:38 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{3.0480049}, TokenSnippetLocations:[]int{37, 76}, TokenLocations:[][]int{[]int{37}, []int{76}}}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

3、分词对结果的影响

在前面,当不给StopTokenFile赋值时,我们初步看到了分词对搜索结果的影响。wukong的中文分词完全基于作者的另外一个开源项目sego实现的。分词的准确程度直接影响着索引的建立和关键词的搜索结果。sego的词典和StopTokenFile来自于网络,如果你需要更加准确的分词结果,那么是需要你定期更新dictionary.txt和stop_tokens.txt。

举个例子,如果你的源文档内容为:”你们很感兴趣的 .NET Core 1.1 来了哦”,你的搜索关键词为:兴趣。按照我们的预期,应该可以搜索到这个源文档。但实际输出却是:

types.SearchResponse{Tokens:[]string{"兴趣"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

其原因就在于sego对”你们很感兴趣的 .NET Core 1.1 来了哦”这句话的分词结果是:

你们/r 很感兴趣/l 的/uj  /x ./x net/x  /x core/x  /x 1/x ./x 1/x  /x 来/v 了/ul 哦/zg

sego并没有将“兴趣”分出来,而是将“很感兴趣”四个字放在了一起,wukong引擎自然就不会单独为“兴趣”单独建立文档索引了,搜索不到也就能理解了。因此,sego可以被用来检验wukong引擎分词情况,这将有助于你了解wukong对文档索引的建立情况。

三、持久化索引和启动恢复

上面的例子中,wukong引擎建立的文档索引都是存放在内存中的,程序退出后,这些数据也就随之消失了。每次启动程序都要根据源文档重新建立索引显然是一个很不明智的想法。wukong支持将已建立的索引持久化到磁盘文件中,并在程序重启时从文件中间索引数据恢复出来,并在后续的关键词搜索时使用。wukong底层支持两种持久化引擎,一个是boltdb,另外一个是cznic/kv。默认采用boltdb。

我们来看一个持久化索引的例子(考虑文章size,省略一些代码):

// example2_index_create.go
... ...
func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    os.MkdirAll("./index", 0777)

    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text1}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text2}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text3}, false)

    searcher.FlushIndex()
    log.Println("Created index number:", searcher.NumDocumentsIndexed())
}

这是一个创建持久化索引的源文件。可以看出:如果要持久化索引,只需在engine init时显式设置UsePersistentStorage为true,并设置PersistentStorageFolder,即索引持久化文件存放的路径。执行一下该源文件:

$go run example2_index_create.go
2016/12/06 22:41:49 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:41:53 sego词典载入完毕
2016/12/06 22:41:53 Created index number: 3

执行后,我们会在./index路径下看到持久化后的索引数据文件:

$tree index
index
├── wukong.0
├── wukong.1
├── wukong.2
├── wukong.3
├── wukong.4
├── wukong.5
├── wukong.6
└── wukong.7

0 directories, 8 files

现在我们再建立一个程序,该程序从持久化的索引数据恢复索引到内存中,并针对搜索关键词给出搜索结果:

// example2_index_search.go
... ...
var (
    searcher = engine.Engine{}
)

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    searcher.FlushIndex()
    log.Println("recover index number:", searcher.NumDocumentsIndexed())

    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "巴萨 梅西"}))
}

执行这个程序:

$go run example2_index_search.go
2016/12/06 22:48:37 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:48:41 sego词典载入完毕
2016/12/06 22:48:42 recover index number: 3
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}

该程序成功从前面已经建立好的程序中恢复了索引数据,并针对Search request给出了正确的搜索结果。

需要注意的是:boltdb采用了flock保证互斥访问底层文件数据的,因此当一个程序打开了boltdb,此时如果有另外一个程序尝试打开相同的boltdb,那么后者将阻塞在open boltdb的环节。

四、动态增加和删除索引

wukong引擎支持运行时动态增删索引,并实时影响搜索结果。

我们以上一节建立的持久化索引为基础,启动一个支持索引动态增加的程序:

//example3.go

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        PersistentStorageShards: 8,
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()
    searcher.FlushIndex()
    log.Println("recover index number:", searcher.NumDocumentsIndexed())
    docId = searcher.NumDocumentsIndexed()

    os.MkdirAll("./source", 0777)

    go func() {
        for {
            var paths []string

            //update index dynamically
            time.Sleep(time.Second * 10)
            var path = "./source"
            err := filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
                if f == nil {
                    return err
                }
                if f.IsDir() {
                    return nil
                }

                fc, err := ioutil.ReadFile(path)
                if err != nil {
                    fmt.Println("read file:", path, "error:", err)
                }

                docId++
                fmt.Println("indexing file:", path, "... ...")
                searcher.IndexDocument(docId, types.DocumentIndexData{Content: string(fc)}, true)
                fmt.Println("indexed file:", path, " ok")
                paths = append(paths, path)

                return nil
            })
            if err != nil {
                fmt.Printf("filepath.Walk() returned %v\n", err)
                return
            }

            for _, p := range paths {
                err := os.Remove(p)
                if err != nil {
                    fmt.Println("remove file:", p, " error:", err)
                    continue
                }
                fmt.Println("remove file:", p, " ok!")
            }

            if len(paths) != 0 {
                // 等待索引刷新完毕
                fmt.Println("flush index....")
                searcher.FlushIndex()
                fmt.Println("flush index ok")
            }
        }
    }()

    for {
        var s string
        fmt.Println("Please input your search keywords:")
        fmt.Scanf("%s", &s)
        if s == "exit" {
            break
        }

        fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: s}))
    }
}

example3这个程序启动了一个goroutine,定期到source目录下读取要建立索引的源文档,并实时更新索引数据。main routine则等待用户输入关键词,并通过引擎搜索返回结果。我们来Run一下这个程序:

$go run example3.go
2016/12/06 23:07:17 载入sego词典 ./dict/dictionary.txt
2016/12/06 23:07:21 sego词典载入完毕
2016/12/06 23:07:21 recover index number: 3
Please input your search keywords:
梅西
types.SearchResponse{Tokens:[]string{"梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:
战斗机
types.SearchResponse{Tokens:[]string{"战斗机"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x2, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:

可以看到:基于当前已经恢复的索引,我们可以正确搜索到”梅西”、”战斗机”等关键词所在的文档。

这时我们如果输入:“球王”,我们得到的搜索结果如下:

Please input your search keywords:
球王
types.SearchResponse{Tokens:[]string{"球王"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

没有任何文档得以匹配。

没关系,现在我们就来增加一个文档,里面包含球王等关键字。我们创建一个文档: soccerking.txt,内容为:

《球王马拉多纳》是一部讲述世界上被公认为现代足球坛上最伟大的传奇足球明星迭戈·马拉多纳的影片。他出身于清贫家庭,九岁展露过人才华,十一岁加入阿根廷足球青少年队,十六岁便成为阿根廷甲级联赛最年轻的>球员。1986年世界杯,他为阿根廷队射入足球史上最佳入球,并带领队伍勇夺金杯。他的一生充满争议、大起大落,球迷与人们对他的热爱却从未减少过,生命力旺盛的他多次从人生谷底重生。

将soccerking.txt移动到source目录中,片刻后,可以看到程序输出以下日志:

indexing file: source/soccerking.txt ... ...
indexed file: source/soccerking.txt  ok
remove file: source/soccerking.txt  ok!
flush index....
flush index ok

我们再尝试搜索”球王”、”马拉多纳”等关键词:

Please input your search keywords:
球王
types.SearchResponse{Tokens:[]string{"球王"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x4, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:
马拉多纳
types.SearchResponse{Tokens:[]string{"马拉多纳"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x4, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}

可以看到,这回engine正确搜索到了对应的Doc。

五、分布式索引和搜索

从前面的章节内容,我们大致了解了wukong的工作原理。wukong将索引存储于boltdb中,每个wukong instance独占一份数据,无法共享给其他wukong instance。当一个node上的内存空间不足以满足数据量需求时,需要将wukong引擎进行分布式部署以实现分布式索引和搜索。关于这点,wukong官方提供了一段方案描述:

分布式搜索的原理如下:

当文档数量较多无法在一台机器内存中索引时,可以将文档按照文本内容的hash值裂分(sharding),不同块交由不同服务器索引。在查找时同一请求分发到所有裂分服务器上,然后将所有服务器返回的
结果归并重排序作为最终搜索结果输出。

为了保证裂分的均匀性,建议使用Go语言实现的Murmur3 hash函数:

https://github.com/huichen/murmur

按照上面的原理很容易用悟空引擎实现分布式搜索(每个裂分服务器运行一个悟空引擎),但这样的分布式系统多数是高度定制的,比如任务的调度依赖于分布式环境,有时需要添加额外层的服务器以
均衡负载

实质就是索引和搜索的分片处理。目前我们项目所在阶段尚不需这样一个分布式wukong,因此,这里也没有实战经验可供分享。

六、wukong引擎的局限

有了上面的内容介绍,你基本可以掌握和使用wukong引擎了。不过在选用wukong引擎之前,你务必要了解wukong引擎的一些局限:

1、开发不活跃,资料较少,社区较小
wukong引擎基本上是作者一个人的项目,社区参与度不高,资料很少。另外由于作者正在创业,忙于造轮子^_^,因此wukong项目更新的频度不高。

2、缺少计划和愿景

似乎作者并没有持续将wukong引擎持续改进和发扬光大的想法和动力。Feature上也无增加。这点和bleve比起来就要差很多。

3、查询功能简单,仅支持关键词的AND查询

如果你要支持灵活多样的全文检索的查询方式,那么当前版本的wukong很可能不适合你。

4、搜索的准确度基于dictionary.txt的规模

前面说过,wukong的索引建立和搜索精确度一定程度上取决于分词引擎的分词精确性,这样dictionary.txt文件是否全面,就会成为影响搜索精确度的重要因素。

5、缺少将索引存储于关系DB中的插件支持

当前wukong引擎只能将索引持久化存储于文件中,尚无法和MySQL这样的数据库配合索引的存储和查询。

总之,wukong绝非一个完美的全文搜索引擎,是否选用,要看你所处的context。

七、小结

选用wukong引擎和我们的项目目前所处的context情况不无关系:我们需要快速实现出一个功能简单却可用的全文搜索服务。也许在后续版本中,对查询方式、数据规模有进一步要求时,就是可能考虑更换引擎的时刻了。bleve、elasticsearch到时候就都会被我们列为考虑对象了。

本文代码在可在这里下载。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats