标签 Go 下的文章

论golang Timer Reset方法使用的正确姿势

2016年,Go语言Tiobe编程语言排行榜上位次的大幅蹿升(2016年12月份Tiobe榜单:go位列第16位,Rating值:1.939%)。与此同时,我们也能切身感受到Go语言在世界范围蓬勃发展,其在中国地界儿上的发展更是尤为猛烈^0^:For gopher们的job变多了、网上关于Go的资料也大有“汗牛充栋”之势。作为职业Gopher^0^,要为这个生态添砖加瓦,就要多思考、多总结,关键还要做到“遇到了问题,就要说出来,给出你的见解”。每篇文章都有自己的切入角度和关注重点,因此Gopher们也无需过于担忧资料的“重复”。

这次,我来说说在使用Go标准库中Timer的Reset方法时遇到的问题。

一、关于Timer原理的一些说明

网络编程方面,从用户视角看,golang表象上是一种“阻塞式”网络编程范式,而支撑这种“阻塞式”范式的则是内置于go编译后的executable file中的runtime。runtime利用网络IO多路复用机制实现多个进行网络通信的goroutine的合理调度。goroutine中的执行函数则相当于你在传统C编程中传给epoll机制的回调函数。golang一定层度上消除了在这方面“回调”这种“逆向思维”给你带来的心智负担,简化了网络编程的复杂性。

但长时间“阻塞”显然不能满足大多数业务情景,因此还需要一定的超时机制。比如:在socket层面,我们通过显式设置net.Dialer的Timeout或使用SetReadDeadline、SetWriteDeadline以及SetDeadline;在应用层协议,比如http,client通过设置timeout参数,server通过TimeoutHandler来限制操作的time limit。这些timeout机制,有些是通过runtime的网络多路复用的timeout机制实现,有些则是通过Timer实现的。

标准库中的Timer让用户可以定义自己的超时逻辑,尤其是在应对select处理多个channel的超时、单channel读写的超时等情形时尤为方便。

1、Timer的创建

Timer是一次性的时间触发事件,这点与Ticker不同,后者则是按一定时间间隔持续触发时间事件。Timer常见的使用场景如下:

场景1:

t := time.AfterFunc(d, f)

场景2:

select {
    case m := <-c:
       handle(m)
    case <-time.After(5 * time.Minute):
       fmt.Println("timed out")
}

或:
t := time.NewTimer(5 * time.Minute)
select {
    case m := <-c:
       handle(m)
    case <-t.C:
       fmt.Println("timed out")
}

从这两个场景中,我们可以看到Timer三种创建姿势:

t:= time.NewTimer(d)
t:= time.AfterFunc(d, f)
c:= time.After(d)

虽然姿势不同,但背后的原理则是相通的。

Timer有三个要素:

* 定时时间:也就是那个d
* 触发动作:也就是那个f
* 时间channel: 也就是t.C

对于AfterFunc这种创建方式而言,Timer就是在超时(timer expire)后,执行函数f,此种情况下:时间channel无用。

//$GOROOT/src/time/sleep.go

func AfterFunc(d Duration, f func()) *Timer {
    t := &Timer{
        r: runtimeTimer{
            when: when(d),
            f:    goFunc,
            arg:  f,
        },
    }
    startTimer(&t.r)
    return t
}

func goFunc(arg interface{}, seq uintptr) {
    go arg.(func())()
}

注意:从AfterFunc源码可以看到,外面传入的f参数并非直接赋值给了内部的f,而是作为wrapper function:goFunc的arg传入的。而goFunc则是启动了一个新的goroutine来执行那个外部传入的f。这是因为timer expire对应的事件处理函数的执行是在go runtime内唯一的timer events maintenance goroutine: timerproc中。为了不block timerproc的执行,必须启动一个新的goroutine。

//$GOROOT/src/runtime/time.go
func timerproc() {
    timers.gp = getg()
    for {
        lock(&timers.lock)
        ... ...
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&timers.lock)
        }
        ... ...
        unlock(&timers.lock)
   }
}

而对于NewTimer和After这两种创建方法,则是Timer在超时(timer expire)后,执行一个标准库中内置的函数:sendTime。sendTime将当前当前事件send到timer的时间Channel中,那么说这个动作不会阻塞到timerproc的执行么?答案肯定是不会的,其原因就在下面代码中:

//$GOROOT/src/time/sleep.go
func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    t := &Timer{
        C: c,
        ... ...
    }
    ... ...
    return t
}

func sendTime(c interface{}, seq uintptr) {
    // Non-blocking send of time on c.
    // Used in NewTimer, it cannot block anyway (buffer).
    // Used in NewTicker, dropping sends on the floor is
    // the desired behavior when the reader gets behind,
    // because the sends are periodic.
    select {
    case c.(chan Time) <- Now():
    default:
    }
}

我们看到NewTimer中创建了一个buffered channel,size = 1。正常情况下,当timer expire,t.C无论是否有goroutine在read,sendTime都可以non-block的将当前时间发送到C中;同时,我们看到sendTime还加了双保险:通过一个select判断c buffer是否已满,一旦满了,直接退出,依然不会block,这种情况在reuse active timer时可能会遇到。

2、Timer的资源释放

很多Go初学者在使用Timer时都会担忧Timer的创建会占用系统资源,比如:

有人会认为:创建一个Timer后,runtime会创建一个单独的Goroutine去计时并在expire后发送当前时间到channel里。
还有人认为:创建一个timer后,runtime会申请一个os级别的定时器资源去完成计时工作。

实际情况并不是这样。恰好近期gopheracademy blog发布了一篇 《How Do They Do It: Timers in Go》,通过对timer源码的分析,讲述了timer的原理,大家可以看看。

go runtime实际上仅仅是启动了一个单独的goroutine,运行timerproc函数,维护了一个”最小堆”,定期wake up后,读取堆顶的timer,执行timer对应的f函数,并移除该timer element。创建一个Timer实则就是在这个最小堆中添加一个element,Stop一个timer,则是从堆中删除对应的element。

同时,从上面的两个Timer常见的使用场景中代码来看,我们并没有显式的去释放什么。从上一节我们可以看到,Timer在创建后可能占用的资源还包括:

  • 0或一个Channel
  • 0或一个Goroutine

这些资源都会在timer使用后被GC回收。

综上,作为Timer的使用者,我们要做的就是尽量减少在使用Timer时对最小堆管理goroutine和GC的压力即可,即:及时调用timer的Stop方法从最小堆删除timer element(如果timer 没有expire)以及reuse active timer。

BTW,这里还有一篇讨论go Timer精度的文章,大家可以拜读一下。

二、Reset到底存在什么问题?

铺垫了这么多,主要还是为了说明Reset的使用问题。什么问题呢?我们来看下面的例子。这些例子主要是为了说明Reset问题,现实中很可能大家都不这么写代码逻辑。当前环境:go version go1.7 darwin/amd64。

1、example1

我们的第一个example如下:

//example1.go

func main() {
    c := make(chan bool)

    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Second * 1)
            c <- false
        }

        time.Sleep(time.Second * 1)
        c <- true
    }()

    go func() {
        for {
            // try to read from channel, block at most 5s.
            // if timeout, print time event and go on loop.
            // if read a message which is not the type we want(we want true, not false),
            // retry to read.
            timer := time.NewTimer(time.Second * 5)
            defer timer.Stop()
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

    //to avoid that all goroutine blocks.
    var s string
    fmt.Scanln(&s)
}

example1.go的逻辑大致就是 一个consumer goroutine试图从一个channel里读出true,如果读出false或timer expire,那么继续try to read from the channel。这里我们每次循环都创建一个timer,并在go routine结束后Stop该timer。另外一个producer goroutine则负责生产消息,并发送到channel中。consumer中实际发生的行为取决于producer goroutine的发送行为。

example1.go执行的结果如下:

$go run example1.go
2016-12-21 14:52:18.657711862 +0800 CST :recv false. continue
2016-12-21 14:52:19.659328152 +0800 CST :recv false. continue
2016-12-21 14:52:20.661031612 +0800 CST :recv false. continue
2016-12-21 14:52:21.662696502 +0800 CST :recv false. continue
2016-12-21 14:52:22.663531677 +0800 CST :recv false. continue
2016-12-21 14:52:23.665210387 +0800 CST :recv true. return

输出如预期。但在这个过程中,我们新创建了6个Timer。

2、example2

如果我们不想重复创建这么多Timer实例,而是reuse现有的Timer实例,那么我们就要用到Timer的Reset方法,见下面example2.go,考虑篇幅,这里仅列出consumer routine代码,其他保持不变:

//example2.go
.... ...
// consumer routine
    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer is active , not fired, stop always returns true, no problems occurs.
            if !timer.Stop() {
                <-timer.C
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()
... ...

按照go 1.7 doc中关于Reset使用的建议:

To reuse an active timer, always call its Stop method first and—if it had expired—drain the value from its channel. For example:

if !t.Stop() {
        <-t.C
}
t.Reset(d)

我们改造了example1,形成example2的代码。由于producer行为并未变更,实际example2执行时,每次循环Timer在被Reset之前都没有expire,也没有fire a time to channel,因此timer.Stop的调用均返回true,即成功将timer从“最小堆”中移除。example2的执行结果如下:

$go run example2.go
2016-12-21 15:10:54.257733597 +0800 CST :recv false. continue
2016-12-21 15:10:55.259349877 +0800 CST :recv false. continue
2016-12-21 15:10:56.261039127 +0800 CST :recv false. continue
2016-12-21 15:10:57.262770422 +0800 CST :recv false. continue
2016-12-21 15:10:58.264534647 +0800 CST :recv false. continue
2016-12-21 15:10:59.265680422 +0800 CST :recv true. return

和example1并无二致。

3、example3

现在producer routine的发送行为发生了变更:从以前每隔1s发送一次数据变成了每隔7s发送一次数据,而consumer routine不变:

//example3.go

//producer routine
    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Second * 7)
            c <- false
        }

        time.Sleep(time.Second * 7)
        c <- true
    }()

我们来看看example3.go的执行结果:

$go run example3.go
2016-12-21 15:14:32.764410922 +0800 CST :timer expired

程序hang住了。你能猜到在哪里hang住的吗?对,就是在drain t.C的时候hang住了:

           // timer may be not active and may not fired
            if !timer.Stop() {
                <-timer.C //drain from the channel
            }
            timer.Reset(time.Second * 5)

producer的发送行为发生了变化,Comsumer routine在收到第一个数据前有了一次time expire的事件,for loop回到loop的开始端。这时timer.Stop函数返回的不再是true,而是false,因为timer已经expire,最小堆中已经不包含该timer了,Stop在最小堆中找不到该timer,返回false。于是example3代码尝试抽干(drain)timer.C中的数据。但timer.C中此时并没有数据,于是routine block在channel recv上了。

在Go 1.8以前版本中,很多人遇到了类似的问题,并提出issue,比如:

time: Timer.Reset is not possible to use correctly #14038

不过go team认为这还是文档中对Reset的使用描述不够充分导致的,于是在Go 1.8中对Reset方法的文档做了补充Go 1.8 beta2中Reset方法的文档改为了:

Resetting a timer must take care not to race with the send into t.C that happens when the current timer expires. If a program has already received a value from t.C, the timer is known to have expired, and t.Reset can be used directly. If a program has not yet received a value from t.C, however, the timer must be stopped and—if Stop reports that the timer expired before being stopped—the channel explicitly drained:

if !t.Stop() {
        <-t.C
}
t.Reset(d)

大致意思是:如果明确time已经expired,并且t.C已经被取空,那么可以直接使用Reset;如果程序之前没有从t.C中读取过值,这时需要首先调用Stop(),如果返回true,说明timer还没有expire,stop成功删除timer,可直接reset;如果返回false,说明stop前已经expire,需要显式drain channel。

4、example4

我们的example3就是“time已经expired,并且t.C已经被取空,那么可以直接使用Reset ”这第一种情况,我们应该直接reset,而不用显式drain channel。如何将这两种情形合二为一,很直接的想法就是增加一个开关变量isChannelDrained,标识timer.C是否已经被取空,如果取空,则直接调用Reset。如果没有,则drain Channel。

增加一个变量总是麻烦的,RussCox也给出一个未经详尽验证的方法,我们来看看用这种方法改造的example4.go:

//example4.go

//consumer
    go func() {
        // try to read from channel, block at most 5s.
        // if timeout, print time event and go on loop.
        // if read a message which is not the type we want(we want true, not false),
        // retry to read.
        timer := time.NewTimer(time.Second * 5)
        for {
            // timer may be not active, and fired
            if !timer.Stop() {
                select {
                case <-timer.C: //try to drain from the channel
                default:
                }
            }
            timer.Reset(time.Second * 5)
            select {
            case b := <-c:
                if b == false {
                    fmt.Println(time.Now(), ":recv false. continue")
                    continue
                }
                //we want true, not false
                fmt.Println(time.Now(), ":recv true. return")
                return
            case <-timer.C:
                fmt.Println(time.Now(), ":timer expired")
                continue
            }
        }
    }()

执行结果:

$go run example4.go
2016-12-21 15:38:16.704647957 +0800 CST :timer expired
2016-12-21 15:38:18.703107177 +0800 CST :recv false. continue
2016-12-21 15:38:23.706665507 +0800 CST :timer expired
2016-12-21 15:38:25.705314522 +0800 CST :recv false. continue
2016-12-21 15:38:30.70900638 +0800 CST :timer expired
2016-12-21 15:38:32.707482917 +0800 CST :recv false. continue
2016-12-21 15:38:37.711260142 +0800 CST :timer expired
2016-12-21 15:38:39.709668705 +0800 CST :recv false. continue
2016-12-21 15:38:44.71337522 +0800 CST :timer expired
2016-12-21 15:38:46.710880007 +0800 CST :recv false. continue
2016-12-21 15:38:51.713813305 +0800 CST :timer expired
2016-12-21 15:38:53.713063822 +0800 CST :recv true. return

我们利用一个select来包裹channel drain,这样无论channel中是否有数据,drain都不会阻塞住。看似问题解决了。

5、竞争条件

如果你看过timerproc的代码,你会发现其中的这样一段代码:

// go1.7
// $GOROOT/src/runtime/time.go
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&timers.lock)

我们看到在timerproc执行f(arg, seq)这个函数前,timerproc unlock了timers.lock,也就是说f的执行并没有在锁内。

前面说过,f的执行是什么?

对于AfterFunc来说,就是启动一个goroutine,并在这个新goroutine中执行用户传入的函数;
对于After和NewTimer这种创建姿势创建的timer而言,f的执行就是sendTime的执行,也就是向t.C中send 当前时间。

注意:这时候timer expire过程中sendTime的执行与“drain channel”是分别在两个goroutine中执行的,谁先谁后,完全依靠runtime调度。于是example4.go中的看似没有问题的代码,也可能存在问题(当然需要时间粒度足够小,比如ms级的Timer)。

如果sendTime的执行发生在drain channel执行前,那么就是example4.go中的执行结果:Stop返回false(因为timer已经expire了),显式drain channel会将数据读出,后续Reset后,timer正常执行;
如果sendTime的执行发生在drain channel执行后,那么问题就来了,虽然Stop返回false(因为timer已经expire),但drain channel并没有读出任何数据。之后,sendTime将数据发到channel中。timer Reset后的Timer中的Channel实际上已经有了数据,于是当进入下面的select执行体时,”case <-timer.C:”瞬间返回,触发了timer事件,没有启动超时等待的作用。

这也是issue:*time: Timer.C can still trigger even after Timer.Reset is called #11513中问到的问题。

go官方文档中对此也有描述:

Note that it is not possible to use Reset's return value correctly, as there is a race condition between draining the channel and the new timer expiring. Reset should always be invoked on stopped or expired channels, as described above. The return value exists to preserve compatibility with existing programs.

三、真的有Reset方法的正确使用姿势吗?

综合上述例子和分析,Reset的使用似乎没有理想的方案,但一般来说,在特定业务逻辑下,Reset还是可以正常工作的,就如example4那样。即便出现问题,如果了解了Reset背后的原理,问题解决起来也是会很快很准的。

文中的相关代码可以在这里下载

四、参考资料

Golang官方有关Timer的issue list:

runtime: special case timer channels #8898
time:timer stop ,how to use? #14947
time: document proper usage of Timer.Stop #14383
*time: Timer.Reset is not possible to use correctly #14038
Time.After doesn’t release memory #15781
runtime: timerproc does not get to run under load #15706
time: time.After uses memory until duration times out #15698
time:timer stop panic #14946
*time: Timer.C can still trigger even after Timer.Reset is called #11513
time: Timer.Stop documentation incorrect for Timer returned by AfterFunc #17600

相关资料:

  1. go中的定时器timer
  2. Go内部实现之timer
  3. Go定时器
  4. How Do They Do It: Timers in Go
  5. timer在go可以有多精确

使用wukong全文搜索引擎

近期项目中有一个全文索引和全文搜索的业务需求,组内同事在这方面都没啥经验,找一个满足我们需求的开源的全文搜索引擎势在必行。我们这一期对全文搜索引擎的需求并不复杂,最主要的是引擎可以很好的支持中文分词、索引和搜索,并能快速实现功能。在全文搜索领域,基于Apache luceneElasticSearch舍我其谁,其强大的分布式系统能力、对超大规模数据的支持、友好的Restful API以及近实时的搜索性能都是业内翘楚,并且其开发社区也是相当活跃,资料众多。但也正式由于其体量较大,我们并没有在本期项目中选择使用ElasticSearch,而是挑选了另外一个“fame”不是那么响亮的引擎:wukong

一、wukong简介

wukong,是一款golang实现的高性能、支持中文分词的全文搜索引擎。我个人觉得它最大的特点恰恰是不像ElasticSearch那样庞大和功能完备,而是可以以一个Library的形式快速集成到你的应用或服务中去,这可能也是在当前阶段选择它的最重要原因,当然其golang技术栈也是让我垂涎于它的另外一个原因:)。

第一次知道wukong,其实是在今年的GopherChina大会上,其作者陈辉作为第一个演讲嘉宾在大会上分享了“Go与人工智能”。在这个presentation中,chen hui详细讲解了wukong搜索引擎以及其他几个关联的开源项目,比如:sego等。

在golang世界中,做full text search的可不止wukong一个。另外一个比较知名的是bleve,但默认情况下,bleve并不支持中文分词和搜索,需要结合中文分词插件才能支持,比如:gojieba

wukong基本上是陈辉一个人打造的项目,在陈辉在阿里任职期间,他将其用于阿里内部的一些项目中,但总体来说,wukong的应用还是很小众的,相关资料也不多,基本都集中在其github站点上。关于wukong源码的分析,倒是在国外站点上发现一篇:《Code reading: wukong full-text search engine》。

本文更多聚焦于应用wukong引擎,而不是来分析wukong代码。

二、全文索引和检索

1、最简单的例子

我们先来看一个使用wukong引擎编写的最简单的例子:

//example1.go

package main

import (
    "fmt"

    "github.com/huichen/wukong/engine"
    "github.com/huichen/wukong/types"
)

var (
    searcher = engine.Engine{}
    docId    uint64
)

const (
    text1 = `在苏黎世的FIFA颁奖典礼上,巴萨球星、阿根廷国家队队长梅西赢得了生涯第5个金球奖,继续创造足坛的新纪录`
    text2 = `12月6日,网上出现照片显示国产第五代战斗机歼-20的尾翼已经涂上五位数部队编号`
)

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        SegmenterDictionaries: "./dict/dictionary.txt",
        StopTokenFile:         "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text1}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text2}, false)

    searcher.FlushIndex()

    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "巴萨 梅西"}))
    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "战斗机 金球奖"}))
}

在这个例子中,我们创建的wukong engine索引了两个doc:text1和text2,建立好索引后,我们利用引擎进行关键词查询,我们来看看查询结果:

$go run example1.go
2016/12/06 21:40:04 载入sego词典 ./dict/dictionary.txt
2016/12/06 21:40:08 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

可以看出当查询“巴萨 梅西”时,引擎正确匹配到了第一个文档(DocId:0×1)。而第二次查询关键词组合“战斗机 金球奖”则没有匹配到任何文档。从这个例子我们也可以看出,wukong引擎对关键词查询支持的是关键词的AND查询,只有文档中同时包含所有关键词,才能被匹配到。这也是目前wukong引擎唯一支持的一种关键词搜索组合模式。

wukong引擎的索引key是一个uint64值,我们需要保证该值的唯一性,否则将导致已创建的索引被override。

另外我们看到:在初始化IndexerInitOptions时,我们传入的IndexType是types.DocIdsIndex,这将指示engine在建立的索引和搜索结果中只保留匹配到的DocId信息,这将最小化wukong引擎对内存的占用。

如果在初始化EngineInitOptions时不给StopTokenFile赋值,那么当我们搜索”巴萨 梅西”时,引擎会将keywords分成三个关键词:”巴萨”、空格和”梅西”分别搜索并Merge结果:

$go run example1.go
2016/12/06 21:57:47 载入sego词典 ./dict/dictionary.txt
2016/12/06 21:57:51 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", " ", "梅西"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}
types.SearchResponse{Tokens:[]string{"战斗机", " ", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

2、FrequenciesIndex和LocationsIndex

wukong Engine的IndexType支持的另外两个类型是FrequenciesIndex和LocationsIndex,分别对应的是保留词频信息以及关键词在文档中出现的位置信息,这两类IndexType对内存的消耗量也是逐渐增大的,毕竟保留的信息是递增的:

当IndexType = FrequenciesIndex时:

$go run example1.go
2016/12/06 22:03:47 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:03:51 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{3.0480049}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

当IndexType = LocationsIndex时:

$go run example1.go
2016/12/06 22:04:31 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:04:38 sego词典载入完毕
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{3.0480049}, TokenSnippetLocations:[]int{37, 76}, TokenLocations:[][]int{[]int{37}, []int{76}}}}, Timeout:false, NumDocs:1}
types.SearchResponse{Tokens:[]string{"战斗机", "金球奖"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

3、分词对结果的影响

在前面,当不给StopTokenFile赋值时,我们初步看到了分词对搜索结果的影响。wukong的中文分词完全基于作者的另外一个开源项目sego实现的。分词的准确程度直接影响着索引的建立和关键词的搜索结果。sego的词典和StopTokenFile来自于网络,如果你需要更加准确的分词结果,那么是需要你定期更新dictionary.txt和stop_tokens.txt。

举个例子,如果你的源文档内容为:”你们很感兴趣的 .NET Core 1.1 来了哦”,你的搜索关键词为:兴趣。按照我们的预期,应该可以搜索到这个源文档。但实际输出却是:

types.SearchResponse{Tokens:[]string{"兴趣"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

其原因就在于sego对”你们很感兴趣的 .NET Core 1.1 来了哦”这句话的分词结果是:

你们/r 很感兴趣/l 的/uj  /x ./x net/x  /x core/x  /x 1/x ./x 1/x  /x 来/v 了/ul 哦/zg

sego并没有将“兴趣”分出来,而是将“很感兴趣”四个字放在了一起,wukong引擎自然就不会单独为“兴趣”单独建立文档索引了,搜索不到也就能理解了。因此,sego可以被用来检验wukong引擎分词情况,这将有助于你了解wukong对文档索引的建立情况。

三、持久化索引和启动恢复

上面的例子中,wukong引擎建立的文档索引都是存放在内存中的,程序退出后,这些数据也就随之消失了。每次启动程序都要根据源文档重新建立索引显然是一个很不明智的想法。wukong支持将已建立的索引持久化到磁盘文件中,并在程序重启时从文件中间索引数据恢复出来,并在后续的关键词搜索时使用。wukong底层支持两种持久化引擎,一个是boltdb,另外一个是cznic/kv。默认采用boltdb。

我们来看一个持久化索引的例子(考虑文章size,省略一些代码):

// example2_index_create.go
... ...
func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    os.MkdirAll("./index", 0777)

    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text1}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text2}, false)
    docId++
    searcher.IndexDocument(docId, types.DocumentIndexData{Content: text3}, false)

    searcher.FlushIndex()
    log.Println("Created index number:", searcher.NumDocumentsIndexed())
}

这是一个创建持久化索引的源文件。可以看出:如果要持久化索引,只需在engine init时显式设置UsePersistentStorage为true,并设置PersistentStorageFolder,即索引持久化文件存放的路径。执行一下该源文件:

$go run example2_index_create.go
2016/12/06 22:41:49 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:41:53 sego词典载入完毕
2016/12/06 22:41:53 Created index number: 3

执行后,我们会在./index路径下看到持久化后的索引数据文件:

$tree index
index
├── wukong.0
├── wukong.1
├── wukong.2
├── wukong.3
├── wukong.4
├── wukong.5
├── wukong.6
└── wukong.7

0 directories, 8 files

现在我们再建立一个程序,该程序从持久化的索引数据恢复索引到内存中,并针对搜索关键词给出搜索结果:

// example2_index_search.go
... ...
var (
    searcher = engine.Engine{}
)

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()

    searcher.FlushIndex()
    log.Println("recover index number:", searcher.NumDocumentsIndexed())

    fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: "巴萨 梅西"}))
}

执行这个程序:

$go run example2_index_search.go
2016/12/06 22:48:37 载入sego词典 ./dict/dictionary.txt
2016/12/06 22:48:41 sego词典载入完毕
2016/12/06 22:48:42 recover index number: 3
types.SearchResponse{Tokens:[]string{"巴萨", "梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}

该程序成功从前面已经建立好的程序中恢复了索引数据,并针对Search request给出了正确的搜索结果。

需要注意的是:boltdb采用了flock保证互斥访问底层文件数据的,因此当一个程序打开了boltdb,此时如果有另外一个程序尝试打开相同的boltdb,那么后者将阻塞在open boltdb的环节。

四、动态增加和删除索引

wukong引擎支持运行时动态增删索引,并实时影响搜索结果。

我们以上一节建立的持久化索引为基础,启动一个支持索引动态增加的程序:

//example3.go

func main() {
    searcher.Init(types.EngineInitOptions{
        IndexerInitOptions: &types.IndexerInitOptions{
            IndexType: types.DocIdsIndex,
        },
        UsePersistentStorage:    true,
        PersistentStorageFolder: "./index",
        PersistentStorageShards: 8,
        SegmenterDictionaries:   "./dict/dictionary.txt",
        StopTokenFile:           "./dict/stop_tokens.txt",
    })
    defer searcher.Close()
    searcher.FlushIndex()
    log.Println("recover index number:", searcher.NumDocumentsIndexed())
    docId = searcher.NumDocumentsIndexed()

    os.MkdirAll("./source", 0777)

    go func() {
        for {
            var paths []string

            //update index dynamically
            time.Sleep(time.Second * 10)
            var path = "./source"
            err := filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
                if f == nil {
                    return err
                }
                if f.IsDir() {
                    return nil
                }

                fc, err := ioutil.ReadFile(path)
                if err != nil {
                    fmt.Println("read file:", path, "error:", err)
                }

                docId++
                fmt.Println("indexing file:", path, "... ...")
                searcher.IndexDocument(docId, types.DocumentIndexData{Content: string(fc)}, true)
                fmt.Println("indexed file:", path, " ok")
                paths = append(paths, path)

                return nil
            })
            if err != nil {
                fmt.Printf("filepath.Walk() returned %v\n", err)
                return
            }

            for _, p := range paths {
                err := os.Remove(p)
                if err != nil {
                    fmt.Println("remove file:", p, " error:", err)
                    continue
                }
                fmt.Println("remove file:", p, " ok!")
            }

            if len(paths) != 0 {
                // 等待索引刷新完毕
                fmt.Println("flush index....")
                searcher.FlushIndex()
                fmt.Println("flush index ok")
            }
        }
    }()

    for {
        var s string
        fmt.Println("Please input your search keywords:")
        fmt.Scanf("%s", &s)
        if s == "exit" {
            break
        }

        fmt.Printf("%#v\n", searcher.Search(types.SearchRequest{Text: s}))
    }
}

example3这个程序启动了一个goroutine,定期到source目录下读取要建立索引的源文档,并实时更新索引数据。main routine则等待用户输入关键词,并通过引擎搜索返回结果。我们来Run一下这个程序:

$go run example3.go
2016/12/06 23:07:17 载入sego词典 ./dict/dictionary.txt
2016/12/06 23:07:21 sego词典载入完毕
2016/12/06 23:07:21 recover index number: 3
Please input your search keywords:
梅西
types.SearchResponse{Tokens:[]string{"梅西"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x1, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:
战斗机
types.SearchResponse{Tokens:[]string{"战斗机"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x2, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:

可以看到:基于当前已经恢复的索引,我们可以正确搜索到”梅西”、”战斗机”等关键词所在的文档。

这时我们如果输入:“球王”,我们得到的搜索结果如下:

Please input your search keywords:
球王
types.SearchResponse{Tokens:[]string{"球王"}, Docs:[]types.ScoredDocument{}, Timeout:false, NumDocs:0}

没有任何文档得以匹配。

没关系,现在我们就来增加一个文档,里面包含球王等关键字。我们创建一个文档: soccerking.txt,内容为:

《球王马拉多纳》是一部讲述世界上被公认为现代足球坛上最伟大的传奇足球明星迭戈·马拉多纳的影片。他出身于清贫家庭,九岁展露过人才华,十一岁加入阿根廷足球青少年队,十六岁便成为阿根廷甲级联赛最年轻的>球员。1986年世界杯,他为阿根廷队射入足球史上最佳入球,并带领队伍勇夺金杯。他的一生充满争议、大起大落,球迷与人们对他的热爱却从未减少过,生命力旺盛的他多次从人生谷底重生。

将soccerking.txt移动到source目录中,片刻后,可以看到程序输出以下日志:

indexing file: source/soccerking.txt ... ...
indexed file: source/soccerking.txt  ok
remove file: source/soccerking.txt  ok!
flush index....
flush index ok

我们再尝试搜索”球王”、”马拉多纳”等关键词:

Please input your search keywords:
球王
types.SearchResponse{Tokens:[]string{"球王"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x4, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}
Please input your search keywords:
马拉多纳
types.SearchResponse{Tokens:[]string{"马拉多纳"}, Docs:[]types.ScoredDocument{types.ScoredDocument{DocId:0x4, Scores:[]float32{0}, TokenSnippetLocations:[]int(nil), TokenLocations:[][]int(nil)}}, Timeout:false, NumDocs:1}

可以看到,这回engine正确搜索到了对应的Doc。

五、分布式索引和搜索

从前面的章节内容,我们大致了解了wukong的工作原理。wukong将索引存储于boltdb中,每个wukong instance独占一份数据,无法共享给其他wukong instance。当一个node上的内存空间不足以满足数据量需求时,需要将wukong引擎进行分布式部署以实现分布式索引和搜索。关于这点,wukong官方提供了一段方案描述:

分布式搜索的原理如下:

当文档数量较多无法在一台机器内存中索引时,可以将文档按照文本内容的hash值裂分(sharding),不同块交由不同服务器索引。在查找时同一请求分发到所有裂分服务器上,然后将所有服务器返回的
结果归并重排序作为最终搜索结果输出。

为了保证裂分的均匀性,建议使用Go语言实现的Murmur3 hash函数:

https://github.com/huichen/murmur

按照上面的原理很容易用悟空引擎实现分布式搜索(每个裂分服务器运行一个悟空引擎),但这样的分布式系统多数是高度定制的,比如任务的调度依赖于分布式环境,有时需要添加额外层的服务器以
均衡负载

实质就是索引和搜索的分片处理。目前我们项目所在阶段尚不需这样一个分布式wukong,因此,这里也没有实战经验可供分享。

六、wukong引擎的局限

有了上面的内容介绍,你基本可以掌握和使用wukong引擎了。不过在选用wukong引擎之前,你务必要了解wukong引擎的一些局限:

1、开发不活跃,资料较少,社区较小
wukong引擎基本上是作者一个人的项目,社区参与度不高,资料很少。另外由于作者正在创业,忙于造轮子^_^,因此wukong项目更新的频度不高。

2、缺少计划和愿景

似乎作者并没有持续将wukong引擎持续改进和发扬光大的想法和动力。Feature上也无增加。这点和bleve比起来就要差很多。

3、查询功能简单,仅支持关键词的AND查询

如果你要支持灵活多样的全文检索的查询方式,那么当前版本的wukong很可能不适合你。

4、搜索的准确度基于dictionary.txt的规模

前面说过,wukong的索引建立和搜索精确度一定程度上取决于分词引擎的分词精确性,这样dictionary.txt文件是否全面,就会成为影响搜索精确度的重要因素。

5、缺少将索引存储于关系DB中的插件支持

当前wukong引擎只能将索引持久化存储于文件中,尚无法和MySQL这样的数据库配合索引的存储和查询。

总之,wukong绝非一个完美的全文搜索引擎,是否选用,要看你所处的context。

七、小结

选用wukong引擎和我们的项目目前所处的context情况不无关系:我们需要快速实现出一个功能简单却可用的全文搜索服务。也许在后续版本中,对查询方式、数据规模有进一步要求时,就是可能考虑更换引擎的时刻了。bleve、elasticsearch到时候就都会被我们列为考虑对象了。

本文代码在可在这里下载。

Kubernetes集群的安全配置

使用kubernetes/cluster/kube-up.sh脚本在装有Ubuntu操作系统的bare metal上搭建的Kubernetes集群并不安全,甚至可以说是“完全不设防的”,这是因为Kubernetes集群的核心组件:kube-apiserver启用了insecure-port。insecure-port背后的api server默认完全信任访问该端口的流量,内部无任何安全机制。并且监听insecure-port的api server bind的insecure-address为0.0.0.0。也就是说任何内外部请求,都可以通过insecure-port端口任意操作Kubernetes集群。我们的平台虽小,但“裸奔”的k8s集群也并不是我们想看到的,适当的安全配置是需要的。

在本文中,我将和大家一起学习一下Kubernetes提供的安全机制,并通过安全配置调整,实现K8s集群的“有限”安全。

一、集群现状

我们先来“回顾”一下集群现状,为后续配置调整提供一个可回溯和可比对的“基线”。

1、Nodes

集群基本信息:

# kubectl cluster-info
Kubernetes master is running at http://10.47.136.60:8080
KubeDNS is running at http://10.47.136.60:8080/api/v1/proxy/namespaces/kube-system/services/kube-dns

To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.

当前集群逻辑上由一个master node和两个worker nodes组成:

单master: 10.47.136.60
worker nodes: 10.47.136.60和10.46.181.146

# kubectl get node --show-labels=true
NAME            STATUS    AGE       LABELS
10.46.181.146   Ready     41d       beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/hostname=10.46.181.146
10.47.136.60    Ready     41d       beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/hostname=10.47.136.60
2、kubernetes核心组件的启动参数

我们再来明确一下当前集群中各k8s核心组件的启动参数,这些参数决定着组件背后的行为:

master node & worker node1 – 10.47.136.60上:

root       22000       1  0 Oct17 ?        03:52:55 /opt/bin/kube-controller-manager --master=127.0.0.1:8080 --root-ca-file=/srv/kubernetes/ca.crt --service-account-private-key-file=/srv/kubernetes/server.key --logtostderr=true

root       22021       1  1 Oct17 ?        17:11:15 /opt/bin/kube-apiserver --insecure-bind-address=0.0.0.0 --insecure-port=8080 --etcd-servers=http://127.0.0.1:4001 --logtostderr=true --service-cluster-ip-range=192.168.3.0/24 --admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,SecurityContextDeny,ResourceQuota --service-node-port-range=30000-32767 --advertise-address=10.47.136.60 --client-ca-file=/srv/kubernetes/ca.crt --tls-cert-file=/srv/kubernetes/server.cert --tls-private-key-file=/srv/kubernetes/server.key

root       22121       1  0 Oct17 ?        00:22:30 /opt/bin/kube-scheduler --logtostderr=true --master=127.0.0.1:8080

root     2140405       1  0 Nov15 ?        00:05:26 /opt/bin/kube-proxy --hostname-override=10.47.136.60 --master=http://10.47.136.60:8080 --logtostderr=true

root     1912455       1  1 Nov15 ?        03:43:09 /opt/bin/kubelet --hostname-override=10.47.136.60 --api-servers=http://10.47.136.60:8080 --logtostderr=true --cluster-dns=192.168.3.10 --cluster-domain=cluster.local --config=

worker node2 – 10.46.181.146上:

root      7934     1  1 Nov15 ?        03:06:00 /opt/bin/kubelet --hostname-override=10.46.181.146 --api-servers=http://10.47.136.60:8080 --logtostderr=true --cluster-dns=192.168.3.10 --cluster-domain=cluster.local --config=
root     23026     1  0 Nov15 ?        00:04:49 /opt/bin/kube-proxy --hostname-override=10.46.181.146 --master=http://10.47.136.60:8080 --logtostderr=true

从master node的核心组件kube-apiserver 的启动命令行参数也可以看出我们在开篇处所提到的那样:apiserver insecure-port开启,且bind 0.0.0.0:8080,可以任意访问,连basic_auth都没有。当然api server不只是监听这一个端口,在api server源码中,我们可以看到默认情况下,apiserver还监听了另外一个secure port,该端口的默认值是6443,通过lsof命令查看6443端口的监听进程也可以印证这一点:

//master node上

# lsof -i tcp:6443
COMMAND     PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
kube-apis 22021 root   46u  IPv6 921529      0t0  TCP *:6443 (LISTEN)
3、私钥文件和公钥证书

通过安装脚本在bare-metal上安装的k8s集群,在master node上你会发现如下文件:

root@node1:/srv/kubernetes# ls
ca.crt  kubecfg.crt  kubecfg.key  server.cert  server.key

这些私钥文件和公钥证书是在k8s(1.3.7)集群安装过程由安装脚本创建的,在kubernetes/cluster/common.sh中你可以发现function create-certs这样一个函数,这些文件就是它创建的。

# Create certificate pairs for the cluster.
# $1: The public IP for the master.
#
# These are used for static cert distribution (e.g. static clustering) at
# cluster creation time. This will be obsoleted once we implement dynamic
# clustering.
#
# The following certificate pairs are created:
#
#  - ca (the cluster's certificate authority)
#  - server
#  - kubelet
#  - kubecfg (for kubectl)
#
# TODO(roberthbailey): Replace easyrsa with a simple Go program to generate
# the certs that we need.
#
# Assumed vars
#   KUBE_TEMP
#
# Vars set:
#   CERT_DIR
#   CA_CERT_BASE64
#   MASTER_CERT_BASE64
#   MASTER_KEY_BASE64
#   KUBELET_CERT_BASE64
#   KUBELET_KEY_BASE64
#   KUBECFG_CERT_BASE64
#   KUBECFG_KEY_BASE64
function create-certs {
  local -r primary_cn="${1}"
  ... ...

}

简单描述一下这些文件的用途:

- ca.crt:the cluster's certificate authority,CA证书,即根证书,内置CA公钥,用于验证某.crt文件,是否是CA签发的证书;
- server.cert:kube-apiserver服务端公钥数字证书;
- server.key:kube-apiserver服务端私钥文件;
- kubecfg.crt 和kubecfg.key:按照 create-certs函数注释中的说法:这两个文件是为kubectl访问apiserver[双向证书验证](http://tonybai.com/2015/04/30/go-and-https/)时使用的。

不过,这里我们没有CA的key,无法签发新证书,如果要用这几个文件,那么就仅能限于这几个文件。我们可以利用kubecfg.crt 和kubecfg.key 作为访问api server的client端的key和crt使用。我们来查看一下这几个文件:

查看ca.crt:

#openssl x509 -noout -text -in ca.crt
... ...
Certificate:
    Data:
        Version: 3 (0x2)
        Serial Number: 16946557986148168970 (0xeb2e44b3a1ebb50a)
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: CN=10.47.136.60@1476362758
        Validity
            Not Before: Oct 13 12:45:58 2016 GMT
            Not After : Oct 11 12:45:58 2026 GMT
        Subject: CN=10.47.136.60@1476362758
... ..

查看server.cert:

...
 Data:
        Version: 3 (0x2)
        Serial Number: 1 (0x1)
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: CN=10.47.136.60@1476362758
        Validity
            Not Before: Oct 13 12:45:59 2016 GMT
            Not After : Oct 11 12:45:59 2026 GMT
        Subject: CN=kubernetes-master
...

查看kubecfg.crt:

...
Certificate:
    Data:
        Version: 3 (0x2)
        Serial Number: 2 (0x2)
    Signature Algorithm: sha256WithRSAEncryption
        Issuer: CN=10.47.136.60@1476362758
        Validity
            Not Before: Oct 13 12:45:59 2016 GMT
            Not After : Oct 11 12:45:59 2026 GMT
        Subject: CN=kubecfg
...

再来验证一下server.cert和kubecfg.crt是否是ca.crt签发的:

# openssl verify -CAfile ca.crt kubecfg.crt
kubecfg.crt: OK

# openssl verify -CAfile ca.crt server.cert
server.cert: OK

在前面的apiserver的启动参数展示中,我们已经看到kube-apiserver使用了ca.crt, server.cert和server.key:

/opt/bin/kube-apiserver --insecure-bind-address=0.0.0.0 --insecure-port=8080 --etcd-servers=http://127.0.0.1:4001 --logtostderr=true --service-cluster-ip-range=192.168.3.0/24 --admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,SecurityContextDeny,ResourceQuota --service-node-port-range=30000-32767 --advertise-address=10.47.136.60 --client-ca-file=/srv/kubernetes/ca.crt --tls-cert-file=/srv/kubernetes/server.cert --tls-private-key-file=/srv/kubernetes/server.key

在后续章节中,我们还会详细说明这些密钥和公钥证书在K8s集群安全中所起到的作用。

二、集群环境

还是那句话,Kubernetes在active development中,老版本和新版本的安全机制可能有较大变动,本篇中的配置方案和步骤都是针对一定环境有效的,我们的环境如下:

OS:
Ubuntu 14.04.4 LTS Kernel:3.19.0-70-generic #78~14.04.1-Ubuntu SMP Fri Sep 23 17:39:18 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

Docker:
# docker version
Client:
 Version:      1.12.2
 API version:  1.24
 Go version:   go1.6.3
 Git commit:   bb80604
 Built:        Tue Oct 11 17:00:50 2016
 OS/Arch:      linux/amd64

Server:
 Version:      1.12.2
 API version:  1.24
 Go version:   go1.6.3
 Git commit:   bb80604
 Built:        Tue Oct 11 17:00:50 2016
 OS/Arch:      linux/amd64

Kubernetes集群:1.3.7

私有镜像仓库:阿里云镜像仓库

三、目标

目前,我们尚不具备一步迈向“绝对安全”的能力,在目标设定时,我们的一致想法是在当前阶段“有限安全”的K8s集群更适合我们。在这一原则下,我们针对不同情况提出不同的目标设定。

前面说过,k8s针对insecure port(–insecure-bind-address=0.0.0.0 –insecure-port=8080)的流量没有任何安全机制限制,相当于k8s“裸奔”。但是走k8s apiserver secure port(–bind-address=0.0.0.0 –secure-port=6443)的流量,将会遇到验证、授权等安全机制的限制。具体使用哪个端口与API server的交互方式,要视情况而定。

在分情况说明之前,将api server的insecure port的bind address由0.0.0.0改为local address是必须要做的。

1、Cluster -> Master(apiserver)

从集群到Apiserver的流量也可以细分为几种情况:

a) kubernetes component on master node -> apiserver

由于master node上的components与apiserver运行在一台机器上,因此可以通过local address的insecure-port访问apiserver,无需走insecure port。从现状中当前master上的component组件的启动参数来看,目前已经符合要求,于是针对这些components,我们无需再做配置上的调整。

b) kubernetes component on worker node -> apiserver

目标是实现kubernetes components on worker node和运行于master上的apiserver之间的基于https的双向认证。kubernetes的各个组件均支持在命令行参数中传入tls相关参数,比如ca文件路径,比如client端的cert文件和key等。

c) componet in pod for kubernetes -> apiserver

像kube dns和kube dashboard这些运行于pod中的k8s 组件也是在k8s cluster范围内调度的,它们可能运行在任何一个worker node上。理想情况下,它们与master上api server的通信也应该是基于一定安全机制的。不过在本篇中,我们暂时不动它们的设置,以免对其他目标的实现造成一定障碍和更多的工作量,在后续文章中,可能会专门将dns和dashboard拿出来做安全加固说明。因此,dns和dashboard在这里仍然使用的是insecure-port:

root     10531 10515  0 Nov15 ?        00:03:02 /dashboard --port=9090 --apiserver-host=http://10.47.136.60:8080
root     2018255 2018240  0 Nov15 ?        00:03:50 /kube-dns --domain=cluster.local. --dns-port=10053 --kube-master-url=http://10.47.136.60:8080
d) user service in pod -> apiserver

我们的集群管理程序也是以service的形式运行在k8s cluster中的,这些程序如何访问apiserver才是我们关心的重点,我们希望管理程序通过secure-port,在一定的安全机制下与apiserver交互。

2、Master(apiserver) -> Cluster

apiserver作为client端访问Cluster,在k8s文档中,这个访问路径主要包含两种情况:

a) apiserver与各个node上kubelet交互,采集Pod的log;
b) apiserver通过自身的proxy功能访问node、pod以及集群中的各种service。

在“有限安全”的原则下,我们暂不考虑这种情况下的安全机制。

四、Kubernetes的安全机制

kube-apiserver是整个kubernetes集群的核心,无论是kubectl还是通过api管理集群,最终都会落到与kube-apiserver的交互,apiserver是集群管理命令的入口。kube-apiserver同时监听两个端口:insecure-port和secure-port。之前提到过:通过insecure-port进入apiserver的流量可以有控制整个集群的全部权限;而通过secure-port的流量将经过k8s的安全机制的重重考验,这也是这一节我们重要要说明的。insecure-port的存在一般是为了集群bootstrap或集群开发调试使用的。官方文档建议:集群外部流量都应该走secure port。insecure-port可通过firewall rule使外部流量unreachable。

下面这幅官方图示准确解释了通过secure port的流量将要通过的“安全关卡”:

img{512x368}

我们可以看到外界到APIServer的请求先后经过了:

安全通道(tls) -> Authentication(身份验证) -> Authorization(授权)-> Admission Control(入口条件控制)
  • 安全通道:即基于tls的https的安全通道建立,对流量进行加密,防止嗅探、身份冒充和篡改;

  • Authentication:即身份验证,这个环节它面对的输入是整个http request。它负责对来自client的请求进行身份校验,支持的方法包括:client证书验证(https双向验证)、basic auth、普通token以及jwt token(用于serviceaccount)。APIServer启动时,可以指定一种Authentication方法,也可以指定多种方法。如果指定了多种方法,那么APIServer将会逐个使用这些方法对客户端请求进行验证,只要请求数据通过其中一种方法的验证,APIServer就会认为Authentication成功;

  • Authorization:授权。这个阶段面对的输入是http request context中的各种属性,包括:user、group、request path(比如:/api/v1、/healthz、/version等)、request verb(比如:get、list、create等)。APIServer会将这些属性值与事先配置好的访问策略(access policy)相比较。APIServer支持多种authorization mode,包括AlwaysAllow、AlwaysDeny、ABAC、RBAC和Webhook。APIServer启动时,可以指定一种authorization mode,也可以指定多种authorization mode,如果是后者,只要Request通过了其中一种mode的授权,那么该环节的最终结果就是授权成功。

  • Admission Control:从技术的角度看,Admission control就像a chain of interceptors(拦截器链模式),它拦截那些已经顺利通过authentication和authorization的http请求。http请求沿着APIServer启动时配置的admission control chain顺序逐一被拦截和处理,如果某个interceptor拒绝了该http请求,那么request将会被直接reject掉,而不是像authentication或authorization那样有继续尝试其他interceptor的机会。

五、实现安全传输通道(https)与身份校验(authentication)

在建立安全传输通道、身份校验环节,我们根据”目标“设定一节中的分类,也分为三种情况:

a) 运行于master上的核心k8s components走insecure port,这个暂不用修改配置;
b) worker node上的k8s组件配置通过insecure-port访问,并采用https双向认证的身份验证机制;
c) pod in k8s访问apiserver,通过https+ basic auth的方式进行身份验证。

APIServer直接使用了集群创建时创建的ca.crt、server.cert和server.key,由于没有ca.key,所以我们只能直接利用其它两个文件: kubecfg.key和kubecfg.crt作为客户端的私钥文件和公钥证书。当然你也可以手动重新创建ca,并将apiserver使用的.key、.crt以及各个components的client.key和client.crt都生成一份,并用你生成的Ca签发。这里我们就偷个懒儿了。

在开始之前,我们再来看看apiserver的启动参数:

root       22021       1  1 Oct17 ?        17:11:15 /opt/bin/kube-apiserver --insecure-bind-address=0.0.0.0 --insecure-port=8080 --etcd-servers=http://127.0.0.1:4001 --logtostderr=true --service-cluster-ip-range=192.168.3.0/24 --admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,SecurityContextDeny,ResourceQuota --service-node-port-range=30000-32767 --advertise-address=10.47.136.60 --client-ca-file=/srv/kubernetes/ca.crt --tls-cert-file=/srv/kubernetes/server.cert --tls-private-key-file=/srv/kubernetes/server.key

由于之前简述了Kubernetes的安全机制,于是我们对这些参数又有了进一步认识

https安全通道建立阶段:端口6443(通过 /opt/bin/kube-apiserver --help查看options说明可以得到),公钥证书server.cert ,私钥文件:server.key。
Authentication阶段:从当前启动参数中,我们仅能看到一种机制:--client-ca-file=/srv/kubernetes/ca.crt,也就是client证书校验机制。apiserver会用/srv/kubernetes/ca.crt对client端发过来的client.crt进行验证。
Authorization阶段:通过 /opt/bin/kube-apiserver --help查看options说明可以得到:--authorization-mode="AlwaysAllow",也就是说在这一环节,所有Request都可以顺利通过。
Admission Control阶段:apiserver指定了“NamespaceLifecycle,LimitRanger,ServiceAccount,SecurityContextDeny,ResourceQuota”这样一个interceptor链。

我们首先来测试一下通过kubecfg.key和kubecfg.crt访问APIServer的insecure-port,验证一下kubecfg.key和kubecfg.crt作为client端私钥文件和公钥证书的可行性:

# curl https://10.47.136.60:6443/version --cert /srv/kubernetes/kubecfg.crt --key /srv/kubernetes/kubecfg.key --cacert /srv/kubernetes/ca.crt
{
  "major": "1",
  "minor": "3",
  "gitVersion": "v1.3.7",
  "gitCommit": "a2cba278cba1f6881bb0a7704d9cac6fca6ed435",
  "gitTreeState": "clean",
  "buildDate": "2016-09-12T23:08:43Z",
  "goVersion": "go1.6.2",
  "compiler": "gc",
  "platform": "linux/amd64"
}

接下来,我们就来开始调整k8s配置。

第一个场景:components on worker node -> master

worker node上有两个k8s components:kubelet和kube-proxy,当前它们的启动参数为:

root      7934     1  1 Nov15 ?        03:33:35 /opt/bin/kubelet --hostname-override=10.46.181.146 --api-servers=http://10.47.136.60:8080 --logtostderr=true --cluster-dns=192.168.3.10 --cluster-domain=cluster.local --config=
root      8140     1  0 14:59 ?        00:00:00 /opt/bin/kube-proxy --hostname-override=10.46.181.146 --master=http://10.47.136.60:8080 --logtostderr=true

我们将ca.crt、kubecfg.key和kubecfg.crt scp到其他各个Worker node的/srv/kubernetes目录下:

root@node1:/srv/kubernetes# scp ca.crt root@10.46.181.146:/srv/kubernetes
ca.crt                                                                                                                                        100% 1220     1.2KB/s   00:00
root@node1:/srv/kubernetes# scp kubecfg.crt root@10.46.181.146:/srv/kubernetes
kubecfg.crt                                                                                                                                   100% 4417     4.3KB/s   00:00
root@node1:/srv/kubernetes# scp kubecfg.key root@10.46.181.146:/srv/kubernetes
kubecfg.key

在worker node: 10.46.181.146上:

# ls -l
total 16
-rw-r----- 1 root root 1220 Nov 25 15:51 ca.crt
-rw------- 1 root root 4417 Nov 25 15:51 kubecfg.crt
-rw------- 1 root root 1708 Nov 25 15:51 kubecfg.key

创建worker node上kubelet和kube-proxy所要使用的config文件:/root/.kube/config

/root/.kube/config

apiVersion: v1
kind: Config
preferences: {}
users:
- name: kubecfg
  user:
    client-certificate: /srv/kubernetes/kubecfg.crt
    client-key: /srv/kubernetes/kubecfg.key
clusters:
- cluster:
    certificate-authority: /srv/kubernetes/ca.crt
  name: ubuntu
contexts:
- context:
    cluster: ubuntu
    user: kubecfg
  name: ubuntu
current-context: ubuntu

这个文件参考了master node上的/root/.kube/config文件的格式,你也可以在master node上使用kubectl config view查看config文件内容:

# kubectl config view
apiVersion: v1
clusters:
- cluster:
    insecure-skip-tls-verify: true
    server: http://10.47.136.60:8080
  name: ubuntu
contexts:
- context:
    cluster: ubuntu
    user: ubuntu
  name: ubuntu
current-context: ubuntu
kind: Config
preferences: {}
users:
- name: ubuntu
  user:
    password: xxxxxA
    username: admin

Worker node上/root/.kube/config中的user.name使用的是kubecfg,这也是在前面查看kubecfg.crt时,kubecfg.crt在/CN域中使用的值。

接下来我们来修改worker node上的/etc/default/kubelet文件:

KUBELET_OPTS=" --hostname-override=10.46.181.146  --api-servers=https://10.47.136.60:6443 --logtostderr=true  --cluster-dns=192.168.3.10  --cluster-domain=cluster.local  --kubeconfig=/root/.kube/config"
#KUBELET_OPTS=" --hostname-override=10.46.181.146  --api-servers=http://10.47.136.60:8080  --logtostderr=true  --cluster-dns=192.168.3.10  --cluster-domain=cluster.local  --config=  "

在worker node上重启kubelet并查看/var/log/upstart/kubelet.log:

# service kubelet restart
kubelet stop/waiting
kubelet start/running, process 9716

///var/log/upstart/kubelet.log
... ...
I1125 16:12:26.332652    9716 server.go:784] Watching apiserver
W1125 16:12:26.338581    9716 kubelet.go:572] Hairpin mode set to "promiscuous-bridge" but configureCBR0 is false, falling back to "hairpin-veth"
I1125 16:12:26.338641    9716 kubelet.go:393] Hairpin mode set to "hairpin-veth"
I1125 16:12:26.366600    9716 docker_manager.go:235] Setting dockerRoot to /var/lib/docker
I1125 16:12:26.367067    9716 server.go:746] Started kubelet v1.3.7
E1125 16:12:26.369508    9716 kubelet.go:954] Image garbage collection failed: unable to find data for container /
I1125 16:12:26.370534    9716 fs_resource_analyzer.go:66] Starting FS ResourceAnalyzer
I1125 16:12:26.370567    9716 status_manager.go:123] Starting to sync pod status with apiserver
I1125 16:12:26.370601    9716 kubelet.go:2501] Starting kubelet main sync loop.
I1125 16:12:26.370632    9716 kubelet.go:2510] skipping pod synchronization - [network state unknown container runtime is down]
I1125 16:12:26.370981    9716 server.go:117] Starting to listen on 0.0.0.0:10250
I1125 16:12:26.384336    9716 volume_manager.go:227] Starting Kubelet Volume Manager
I1125 16:12:26.480387    9716 factory.go:295] Registering Docker factory
I1125 16:12:26.480483    9716 factory.go:54] Registering systemd factory
I1125 16:12:26.481446    9716 factory.go:86] Registering Raw factory
I1125 16:12:26.482888    9716 manager.go:1072] Started watching for new ooms in manager
I1125 16:12:26.484242    9716 oomparser.go:200] OOM parser using kernel log file: "/var/log/kern.log"
I1125 16:12:26.485330    9716 manager.go:281] Starting recovery of all containers
I1125 16:12:26.562959    9716 kubelet.go:1213] Node 10.46.181.146 was previously registered
I1125 16:12:26.712150    9716 manager.go:286] Recovery completed

一次点亮!

再来修改worker node上kube-proxy的配置:/etc/default/kube-proxy:

// /etc/default/kube-proxy
KUBE_PROXY_OPTS=" --hostname-override=10.46.181.146  --master=https://10.47.136.60:6443  --logtostderr=true --kubeconfig=/root/.kube/config"
#KUBE_PROXY_OPTS=" --hostname-override=10.46.181.146  --master=http://10.47.136.60:8080  --logtostderr=true  "

在worker node上重启kube-proxy并查看/var/log/upstart/kube-proxy.log:

# service kube-proxy restart
kube-proxy stop/waiting
kube-proxy start/running, process 26185

// /var/log/upstart/kube-proxy.log
I1125 16:30:28.224491   26185 server.go:202] Using iptables Proxier.
I1125 16:30:28.228067   26185 server.go:214] Tearing down userspace rules.
I1125 16:30:28.245634   26185 conntrack.go:40] Setting nf_conntrack_max to 65536
I1125 16:30:28.247422   26185 conntrack.go:57] Setting conntrack hashsize to 16384
I1125 16:30:28.249456   26185 conntrack.go:62] Setting nf_conntrack_tcp_timeout_established to 86400

从日志上看不出有啥异常,算是成功!:)

第二个场景:pod in cluster -> master

通过阅读K8s的官方文档“Accessing the api from a pod”,我们知道K8s cluster为Pod访问API Server做了很多“预备”工作,最重要的一点就是在Pod被创建的时候,一个serviceaccount 被自动mount到/var/run/secrets/kubernetes.io/serviceaccount路径下:

#kubectl describe pod/my-golang-1147314274-0qms5

Name:        my-golang-1147314274-0qms5
Namespace:    default
Node:        10.47.136.60/10.47.136.60
Start Time:    Thu, 24 Nov 2016 14:59:52 +0800
Labels:        pod-template-hash=1147314274
        run=my-golang
Status:        Running
IP:        172.16.99.9
... ...

Containers:
  my-golang:
    ... ...
    Volume Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-40z0x (ro)
    Environment Variables:    <none>
... ...
Volumes:
  default-token-40z0x:
    Type:    Secret (a volume populated by a Secret)
    SecretName:    default-token-40z0x
QoS Class:    BestEffort
Tolerations:    <none>

serviceaccount顾名思义,是Pod中程序访问APIServer所要使用的账户信息,我们来看看都有啥:

# kubectl get serviceaccount
NAME      SECRETS   AGE
default   1         43d

# kubectl describe serviceaccount/default
Name:        default
Namespace:    default
Labels:        <none>

Image pull secrets:    <none>

Mountable secrets:     default-token-40z0x

Tokens:                default-token-40z0x

# kubectl describe secret/default-token-40z0x
Name:        default-token-40z0x
Namespace:    default
Labels:        <none>
Annotations:    kubernetes.io/service-account.name=default
        kubernetes.io/service-account.uid=90de59ad-9120-11e6-a0a6-00163e1625a9

Type:    kubernetes.io/service-account-token

Data
====
ca.crt:        1220 bytes
namespace:    7 bytes
token:        {Token data}

mount到Pod中/var/run/secrets/kubernetes.io/serviceaccount路径下的default-token-40z0x volume包含三个文件:

  • ca.crt:CA的公钥证书
  • namspace文件:里面的内容为:”default”
  • token:用在Pod访问APIServer时候的身份验证。

理论上,使用这些信息Pod可以成功访问APIServer,我们来测试一下。注意在Pod的世界中,APIServer也是一个Service,通过kubectl get service可以看到:

# kubectl get services
NAME           CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
kubernetes     192.168.3.1     <none>        443/TCP    43d

kubernetes这个Service监听的端口是443,也就是说在Pod的视角中,APIServer暴露的仅仅是insecure-port。并且使用”kubernetes”这个名字,我们可以通过kube-dns获得APIServer的ClusterIP。

启动一个基于golang:latest的pod,pod.yaml如下:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: my-golang
spec:
  replicas: 1
  template:
    metadata:
      labels:
        run: my-golang
    spec:
      containers:
      - name: my-golang
        image: golang:latest
        command: ["tail", "-f", "/var/log/bootstrap.log"]

Pod启动后,docker exec -it container-id /bin/bash切入container,并执行如下命令:

# TOKEN="$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
# curl --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://kubernetes:443/version -H "Authorization: Bearer $TOKEN"
Unauthorized

查看API Server的log:

E1125 17:30:22.504059 2743425 handlers.go:54] Unable to authenticate the request due to an error: crypto/rsa: verification error

似乎是验证token失败。这个问题在kubernetes的github issue中也有被提及,目前尚未解决。

不过仔细想了想,如果每个Pod都默认可以访问APIServer,显然也是不安全的,虽然我们可以通过authority和admission control对默认的token访问做出限制,但总感觉不那么“安全”。

我们来试试basic auth方式(这种方式的弊端是API Server运行中,无法在运行时动态更新auth文件,对于auth文件的修改,必须重启APIServer后生效)。

我们首先在APIServer侧为APIServer创建一个basic auth file:

// /srv/kubernetes/basic_auth_file
admin123,admin,admin

basic_auth_file中每一行的格式:password,username,useruid

修改APIServer的启动参数,将basic_auth_file传入并重启apiserver:

KUBE_APISERVER_OPTS=" --insecure-bind-address=10.47.136.60 --insecure-port=8080 --etcd-servers=http://127.0.0.1:4001 --logtostderr=true --service-cluster-ip-range=192.168.3.0/24 --admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,SecurityContextDeny,ResourceQuota --service-node-port-range=30000-32767 --advertise-address=10.47.136.60 --basic-auth-file=/srv/kubernetes/basic_auth_file --client-ca-file=/srv/kubernetes/ca.crt --tls-cert-file=/srv/kubernetes/server.cert --tls-private-key-file=/srv/kubernetes/server.key"

我们在Pod中使用basic auth访问API Server:

# curl --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt https://kubernetes:443/version -basic -u admin:admin123
{
  "major": "1",
  "minor": "3",
  "gitVersion": "v1.3.7",
  "gitCommit": "a2cba278cba1f6881bb0a7704d9cac6fca6ed435",
  "gitTreeState": "clean",
  "buildDate": "2016-09-12T23:08:43Z",
  "goVersion": "go1.6.2",
  "compiler": "gc",
  "platform": "linux/amd64"
}

Pod to APIServer authentication成功了。

六、小结

再重申一次:上述配置不是绝对安全的理想配置方案,只是阶段性满足我目前项目需求的一个“有限安全”方案,大家谨慎参考。

到目前为止,我们的“有限安全”也仅仅做到Authentication这一步,至于Authority和Admission Control,目前尚未有相关实践,可能会在后续的文章中做单独说明。

七、参考资料

  • Master <-> Node Communication – http://kubernetes.io/docs/admin/master-node-communication/
  • Authentication – http://kubernetes.io/docs/admin/authentication/
  • Using Authorization Plugins – http://kubernetes.io/docs/admin/authorization/
  • Accessing the API – http://kubernetes.io/docs/admin/accessing-the-api/
  • Managing Service Accounts – http://kubernetes.io/docs/admin/service-accounts-admin/
  • Authenticating Across Clusters with kubeconfig — http://kubernetes.io/docs/user-guide/kubeconfig-file/
  • Service Accounts — https://docs.openshift.com/enterprise/3.1/dev_guide/service_accounts.html
  • 4S: SERVICES ACCOUNT, SECRET, SECURITY CONTEXT AND SECURITY IN KUBERNETES — http://www.sel.zju.edu.cn/?p=588
  • KUBERNETES APISERVER源码分析——API请求的认证过程 – http://www.sel.zju.edu.cn/?p=609
  • Kubernetes安全配置案例 – http://www.cnblogs.com/breg/p/5923604.html




这里是Tony Bai的个人Blog,欢迎访问、订阅和留言!订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您喜欢通过微信App浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:



本站Powered by Digital Ocean VPS。

选择Digital Ocean VPS主机,即可获得10美元现金充值,可免费使用两个月哟!

著名主机提供商Linode 10$优惠码:linode10,在这里注册即可免费获得。

阿里云推荐码:1WFZ0V立享9折!

View Tony Bai's profile on LinkedIn


文章

评论

  • 正在加载...

分类

标签

归档











更多