标签 Cgo 下的文章

Go社区主流Kafka客户端简要对比

本文永久链接 – https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients

一. 背景

众所周知,Kafka是Apache开源基金会下的明星级开源项目,作为一个开源的分布式事件流平台,它被成千上万的公司用于高性能数据管道、流分析、数据集成和关键任务应用。在国内,无论大厂小厂,无论是自己部署还是用像阿里云提供的Kafka云服务,很多互联网应用已经离不开Kafka了。

互联网不拘泥于某种编程语言,但很多人不喜欢Kafka是由Scala/Java开发的。尤其是对于那些对某种语言有着“宗教般”虔诚、有着“手里拿着锤子,眼中满世界都是钉子”的程序员来说,总是有想重写Kafka的冲动。但就像很多新语言的拥趸想重写Kubernetes一样,Kafka已经建立起了巨大的起步和生态优势,短期很难建立起同样规格的巨型项目和对应的生态了(近两年同样火热的类Kafka的Apache pulsar创建时间与Kafka是先后脚的,只是纳入Apache基金会托管的时间较晚)。

Kafka生态很强大,各种编程语言都有对应的Kafka client。Kafka背后的那个公司confluent.inc也维护了各大主流语言的client:

其他主流语言的开发人员只需要利用好这些client端,做好与Kafka集群的连接就好了。好了做了这么多铺垫,下面说说为啥要写下这篇文章。

目前业务线生产环境的日志方案是这样的:

从图中我们看到:业务系统将日志写入Kafka,然后通过logstash工具消费日志并汇聚到后面的Elastic Search Cluster中供查询使用。 业务系统主要是由Java实现的,考虑到Kafka写失败的情况,为了防止log阻塞业务流程,业务系统使用了支持fallback appender的logback进行日志写入:这样当Kafka写入失败时,日志还可以写入备用的文件中,尽可能保证日志不丢失

考虑到复用已有的IT设施与方案,我们用Go实现的新系统也向这种不落盘的log汇聚方案靠拢,这就要求我们的logger也要支持向Kafka写入并且支持fallback机制。

我们的log包是基于uber zap封装而来的uber的zap日志包是目前Go社区使用最为广泛的、高性能的log包之一,第25期thoughtworks技术雷达也将zap列为试验阶段的工具推荐给大家,并且thoughtworks团队已经在大规模使用它:

不过,zap原生不支持写Kafka,但zap是可扩展的,我们需要为其增加写Kafka的扩展功能。而要写Kakfa,我们就离不开Kakfa Client包。目前Go社区主流的Kafka client有Shopify的sarama、Kafka背后公司confluent.inc维护的confluent-kafka-go以及segmentio/kafka-go

在这篇文章中,我就根据我的使用历程逐一说说我对这三个客户端的使用感受。

下面,我们首先先来看看star最多的Shopify/sarama。

二. Shopify/sarama:星多不一定代表优秀

目前在Go社区星星最多,应用最广的Kafka client包是Shopify的sarama。Shopify是一家国外的电商平台,我总是混淆Shopify、Shopee(虾皮)以及传闻中要赞助巴萨的Spotify(瑞典流媒体音乐平台),傻傻分不清^_^。

下面我就基于sarama演示一下如何扩展zap,让其支持写kafka。在《一文告诉你如何用好uber开源的zap日志库》一文中,我介绍过zap建构在zapcore之上,而zapcore由Encoder、WriteSyncer和LevelEnabler三部分组成,对于我们这个写Kafka的功能需求来说,我们只需要定义一个给一个WriteSyncer接口的实现,即可组装成一个支持向Kafka写入的logger

我们自顶向下先来看看创建logger的函数:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/log.go

type Logger struct {
    l     *zap.Logger // zap ensure that zap.Logger is safe for concurrent use
    cfg   zap.Config
    level zap.AtomicLevel
}

func (l *Logger) Info(msg string, fields ...zap.Field) {
    l.l.Info(msg, fields...)
}

func New(writer io.Writer, level int8, opts ...zap.Option) *Logger {
    if writer == nil {
        panic("the writer is nil")
    }
    atomicLevel := zap.NewAtomicLevelAt(zapcore.Level(level))

    logger := &Logger{
        cfg:   zap.NewProductionConfig(),
        level: atomicLevel,
    }

    logger.cfg.EncoderConfig.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
        enc.AppendString(t.Format(time.RFC3339)) // 2021-11-19 10:11:30.777
    }
    logger.cfg.EncoderConfig.TimeKey = "logtime"

    core := zapcore.NewCore(
        zapcore.NewJSONEncoder(logger.cfg.EncoderConfig),
        zapcore.AddSync(writer),
        atomicLevel,
    )
    logger.l = zap.New(core, opts...)
    return logger
}

// SetLevel alters the logging level on runtime
// it is concurrent-safe
func (l *Logger) SetLevel(level int8) error {
    l.level.SetLevel(zapcore.Level(level))
    return nil
}

这段代码中没有与kafka client相关的内容,New函数用来创建一个*Logger实例,它接受的第一个参数为io.Writer接口类型,用于指示日志的写入位置。这里要注意一点的是:我们使用zap.AtomicLevel类型存储logger的level信息,基于zap.AtomicLevel的level支持热更新,我们可以在程序运行时动态修改logger的log level。这个也是在《一文告诉你如何用好uber开源的zap日志库》遗留问题的答案。

接下来,我们就基于sarama的AsyncProducer来实现一个满足zapcore.WriteSyncer接口的类型:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/kafka_syncer.go

type kafkaWriteSyncer struct {
    topic          string
    producer       sarama.AsyncProducer
    fallbackSyncer zapcore.WriteSyncer
}

func NewKafkaAsyncProducer(addrs []string) (sarama.AsyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Errors = true
    return sarama.NewAsyncProducer(addrs, config)
}

func NewKafkaSyncer(producer sarama.AsyncProducer, topic string, fallbackWs zapcore.WriteSyncer) zapcore.WriteSyncer {
    w := &kafkaWriteSyncer{
        producer:       producer,
        topic:          topic,
        fallbackSyncer: zapcore.AddSync(fallbackWs),
    }

    go func() {
        for e := range producer.Errors() {
            val, err := e.Msg.Value.Encode()
            if err != nil {
                continue
            }

            fallbackWs.Write(val)
        }
    }()

    return w
}

NewKafkaSyncer是创建zapcore.WriteSyncer的那个函数,它的第一个参数使用了sarama.AsyncProducer接口类型,目的是为了可以利用sarama提供的mock测试包。最后一个参数为fallback时使用的WriteSyncer参数。

NewKafkaAsyncProducer函数是用于方便用户快速创建sarama.AsyncProducer的,其中的config使用的是默认的config值。在config默认值中,Return.Successes的默认值都false,即表示客户端不关心向Kafka写入消息的成功状态,我们也无需单独建立一个goroutine来消费AsyncProducer.Successes()。但我们需要关注写入失败的消息,因此我们将Return.Errors置为true的同时在NewKafkaSyncer中启动了一个goroutine专门处理写入失败的日志数据,将这些数据写入fallback syncer中。

接下来,我们看看kafkaWriteSyncer的Write与Sync方法:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/kafka_syncer.go

func (ws *kafkaWriteSyncer) Write(b []byte) (n int, err error) {
    b1 := make([]byte, len(b))
    copy(b1, b) // b is reused, we must pass its copy b1 to sarama
    msg := &sarama.ProducerMessage{
        Topic: ws.topic,
        Value: sarama.ByteEncoder(b1),
    }

    select {
    case ws.producer.Input() <- msg:
    default:
        // if producer block on input channel, write log entry to default fallbackSyncer
        return ws.fallbackSyncer.Write(b1)
    }
    return len(b1), nil
}

func (ws *kafkaWriteSyncer) Sync() error {
    ws.producer.AsyncClose()
    return ws.fallbackSyncer.Sync()
}

注意:上面代码中的b会被zap重用,因此我们在扔给sarama channel之前需要将b copy一份,将副本发送给sarama。

从上面代码看,这里我们将要写入的数据包装成一个sarama.ProducerMessage,然后发送到producer的Input channel中。这里有一个特殊处理,那就是当如果msg阻塞在Input channel上时,我们将日志写入fallbackSyncer。这种情况是出于何种考虑呢?这主要是因为基于sarama v1.30.0版本的kafka logger在我们的验证环境下出现过hang住的情况,当时的网络可能出现过波动,导致logger与kafka之间的连接出现过异常,我们初步怀疑就是这个位置阻塞,导致业务被阻塞住了。在sarama v1.32.0版本中有一个fix,和我们这个hang的现象很类似。

但这么做也有一个严重的问题,那就是在压测中,我们发现大量日志都无法写入到kafka,而是都写到了fallback syncer中。究其原因,我们在sarama的async_producer.go中看到:input channel是一个unbuffered channel,而从input channel读取消息的dispatcher goroutine也仅仅有一个,考虑到goroutine的调度,大量日志写入fallback syncer就不足为奇了:

// github.com/Shopify/sarama@v1.32.0/async_producer.go
func newAsyncProducer(client Client) (AsyncProducer, error) {
    // Check that we are not dealing with a closed Client before processing any other arguments
    if client.Closed() {
        return nil, ErrClosedClient
    }

    txnmgr, err := newTransactionManager(client.Config(), client)
    if err != nil {
        return nil, err
    }

    p := &asyncProducer{
        client:     client,
        conf:       client.Config(),
        errors:     make(chan *ProducerError),
        input:      make(chan *ProducerMessage), // 笔者注:这是一个unbuffer channel
        successes:  make(chan *ProducerMessage),
        retries:    make(chan *ProducerMessage),
        brokers:    make(map[*Broker]*brokerProducer),
        brokerRefs: make(map[*brokerProducer]int),
        txnmgr:     txnmgr,
    }
    ... ...
}

有人说这里可以加定时器(Timer)做超时,要知道日志都是在程序执行的关键路径上,每写一条log就启动一个Timer感觉太耗了(即便是Reset重用Timer)。如果sarama在任何时候都不会hang住input channel,那么在Write方法中我们还是不要使用select-default这样的trick

sarama的一个不错的地方是提供了mocks测试工具包,该包既可用于sarama的自测,也可以用作依赖sarama的go包的自测,以上面的实现为例,我们可以编写基于mocks测试包的一些test:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/log_test.go

func TestWriteFailWithKafkaSyncer(t *testing.T) {
    config := sarama.NewConfig()
    p := mocks.NewAsyncProducer(t, config)

    var buf = make([]byte, 0, 256)
    w := bytes.NewBuffer(buf)
    w.Write([]byte("hello"))
    logger := New(NewKafkaSyncer(p, "test", NewFileSyncer(w)), 0)

    p.ExpectInputAndFail(errors.New("produce error"))
    p.ExpectInputAndFail(errors.New("produce error"))

    // all below will be written to the fallback sycner
    logger.Info("demo1", zap.String("status", "ok")) // write to the kafka syncer
    logger.Info("demo2", zap.String("status", "ok")) // write to the kafka syncer

    // make sure the goroutine which handles the error writes the log to the fallback syncer
    time.Sleep(2 * time.Second)

    s := string(w.Bytes())
    if !strings.Contains(s, "demo1") {
        t.Errorf("want true, actual false")
    }
    if !strings.Contains(s, "demo2") {
        t.Errorf("want true, actual false")
    }

    if err := p.Close(); err != nil {
        t.Error(err)
    }
}

测试通过mocks.NewAsyncProducer返回满足sarama.AsyncProducer接口的实现。然后设置expect,针对每条消息都要设置expect,这里写入两条日志,所以设置了两次。注意:由于我们是在一个单独的goroutine中处理的Errors channel,因此这里存在一些竞态条件。在并发程序中,Fallback syncer也一定要支持并发写,zapcore提供了zapcore.Lock可以用于将一个普通的zapcore.WriteSyncer包装成并发安全的WriteSyncer。

不过,使用sarama的过程中还遇到过一个“严重”的问题,那就是有些时候数据并没有完全写入到kafka。我们去掉针对input channel的select-default操作,然后创建一个concurrent-write小程序,用于并发的向kafka写入log:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/zapkafka/cmd/concurrent_write/main.go

func SaramaProducer() {
    p, err := log.NewKafkaAsyncProducer([]string{"localhost:29092"})
    if err != nil {
        panic(err)
    }
    logger := log.New(log.NewKafkaSyncer(p, "test", zapcore.AddSync(os.Stderr)), int8(0))
    var wg sync.WaitGroup
    var cnt int64

    for j := 0; j < 10; j++ {
        wg.Add(1)
        go func(j int) {
            var value string
            for i := 0; i < 10000; i++ {
                now := time.Now()
                value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
                logger.Info("log message:", zap.String("value", value))
                atomic.AddInt64(&cnt, 1)
            }
            wg.Done()
        }(j)
    }

    wg.Wait()
    logger.Sync()
    println("cnt =", atomic.LoadInt64(&cnt))
    time.Sleep(10 * time.Second)
}

func main() {
    SaramaProducer()
}

我们用kafka官方提供的docker-compose.yml在本地启动一个kafka服务:

$cd benchmark
$docker-compose up -d

然后我们使用kafka容器中自带的consumer工具从名为test的topic中消费数据,消费的数据重定向到1.log中:

$docker exec benchmark_kafka_1 /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning > 1.log 2>&1

然后我们运行concurrent_write:

$ make
$./concurrent_write > 1.log 2>&1

concurrent_write程序启动了10个goroutine,每个goroutine向kafka写入1w条日志,多数情况下在benchmark目录下的1.log都能看到10w条日志记录,但在使用sarama v1.30.0版本时有些时候看到的是少于10w条的记录,至于那些“丢失”的记录则不知在何处了。使用sarama v1.32.0时,这种情况还尚未出现过。

好了,是时候看看下一个kafka client包了!

三. confluent-kafka-go:需要开启cgo的包还是有点烦

confluent-kafka-go包是kafka背后的技术公司confluent.inc维护的Go客户端,也可以算是Kafka官方Go客户端了。不过这个包唯一的“问题”在于它是基于kafka c/c++库librdkafka构建而成,这意味着一旦你的Go程序依赖confluent-kafka-go,你就很难实现Go应用的静态编译,也无法实现跨平台编译。由于所有业务系统都依赖log包,一旦依赖confluent-kafka-go只能动态链接,我们的构建工具链全需要更改,代价略大。

不过confluent-kafka-go使用起来也很简单,写入性能也不错,并且不存在前面sarama那样的“丢消息”的情况,下面是一个基于confluent-kafka-go的producer示例:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/confluent-kafka-go-static-build/producer.go

func ReadConfig(configFile string) kafka.ConfigMap {
    m := make(map[string]kafka.ConfigValue)
    file, err := os.Open(configFile)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to open file: %s", err)
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := strings.TrimSpace(scanner.Text())
        if !strings.HasPrefix(line, "#") && len(line) != 0 {
            kv := strings.Split(line, "=")
            parameter := strings.TrimSpace(kv[0])
            value := strings.TrimSpace(kv[1])
            m[parameter] = value
        }
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Failed to read file: %s", err)
        os.Exit(1)
    }
    return m
}

func main() {
    conf := ReadConfig("./producer.conf")

    topic := "test"
    p, err := kafka.NewProducer(&conf)
    var mu sync.Mutex

    if err != nil {
        fmt.Printf("Failed to create producer: %s", err)
        os.Exit(1)
    }
    var wg sync.WaitGroup
    var cnt int64

    // Go-routine to handle message delivery reports and
    // possibly other event types (errors, stats, etc)
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
                        *ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
                }
            }
        }
    }()

    for j := 0; j < 10; j++ {
        wg.Add(1)
        go func(j int) {
            var value string
            for i := 0; i < 10000; i++ {
                key := ""
                now := time.Now()
                value = fmt.Sprintf("%02d-%04d-%s", j, i, now.Format("15:04:05"))
                mu.Lock()
                p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                    Key:            []byte(key),
                    Value:          []byte(value),
                }, nil)
                mu.Unlock()
                atomic.AddInt64(&cnt, 1)
            }
            wg.Done()
        }(j)
    }

    wg.Wait()
    // Wait for all messages to be delivered
    time.Sleep(10 * time.Second)
    p.Close()
}

这里我们还是使用10个goroutine向kafka各写入1w消息,注意:默认使用kafka.NewProducer创建的Producer实例不是并发安全的,所以这里用一个sync.Mutex对其Produce调用进行同步管理。我们可以像sarama中的例子那样,在本地启动一个kafka服务,验证一下confluent-kafka-go的运行情况。

由于confluent-kafka-go包基于kafka c库而实现,所以我们没法关闭CGO,如果关闭CGO,将遇到下面编译问题:

$CGO_ENABLED=0 go build
# producer
./producer.go:15:42: undefined: kafka.ConfigMap
./producer.go:17:29: undefined: kafka.ConfigValue
./producer.go:50:18: undefined: kafka.NewProducer
./producer.go:85:22: undefined: kafka.Message
./producer.go:86:28: undefined: kafka.TopicPartition
./producer.go:86:75: undefined: kafka.PartitionAny

因此,默认情况依赖confluent-kafka-go包的Go程序会采用动态链接,通过ldd查看编译后的程序结果如下(on CentOS):

$make build
$ldd producer
    linux-vdso.so.1 =>  (0x00007ffcf87ec000)
    libm.so.6 => /lib64/libm.so.6 (0x00007f473d014000)
    libdl.so.2 => /lib64/libdl.so.2 (0x00007f473ce10000)
    libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f473cbf4000)
    librt.so.1 => /lib64/librt.so.1 (0x00007f473c9ec000)
    libc.so.6 => /lib64/libc.so.6 (0x00007f473c61e000)
    /lib64/ld-linux-x86-64.so.2 (0x00007f473d316000)

那么在CGO开启的情况下是否可以静态编译呢?理论上是可以的。这个在我的《Go语言精进之路》中关于CGO一节有详细说明。

不过confluent-kafka-go包官方目前确认还不支持静态编译。我们来试试在CGO开启的情况下,对其进行静态编译:

// on CentOS
$ go build -buildvcs=false -o producer-static -ldflags '-linkmode "external" -extldflags "-static"'
$ producer
/root/.bin/go1.18beta2/pkg/tool/linux_amd64/link: running gcc failed: exit status 1
/usr/bin/ld: 找不到 -lm
/usr/bin/ld: 找不到 -ldl
/usr/bin/ld: 找不到 -lpthread
/usr/bin/ld: 找不到 -lrt
/usr/bin/ld: 找不到 -lpthread
/usr/bin/ld: 找不到 -lc
collect2: 错误:ld 返回 1

静态链接会将confluent-kafka-go的c语言部分的符号进行静态链接,这些符号可能在libc、libpthread等c运行时库或系统库中,但默认情况下,CentOS是没有安装这些库的.a(archive)版本的。我们需要手动安装:

$yum install glibc-static

安装后,我们再执行上面的静态编译命令:

$go build -buildvcs=false -o producer-static -ldflags '-linkmode "external" -extldflags "-static"'
$ producer
/root/go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.8.2/kafka/librdkafka_vendor/librdkafka_glibc_linux.a(rddl.o):在函数‘rd_dl_open’中:
(.text+0x1d): 警告:Using 'dlopen' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
/root/go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.8.2/kafka/librdkafka_vendor/librdkafka_glibc_linux.a(rdaddr.o):在函数‘rd_getaddrinfo’中:
(.text+0x440): 警告:Using 'getaddrinfo' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking

这回我们的静态编译成功了!

$ ldd producer-static
    不是动态可执行文件

但有一些警告!我们先不理这些警告,试试编译出来的producer-static是否可用。使用docker-compose启动本地kafka服务,执行producer-static,我们发现程序可以正常将10w消息写入kafka,中间没有错误发生。至少在producer场景下,应用并没有执行包含dlopen、getaddrinfo的代码。

不过这不代表在其他场景下上面的静态编译方式没有问题,因此还是等官方方案出炉吧。或者使用builder容器构建你的基于confluent-kafka-go的程序。

我们继续往下看segmentio/kafka-go。

四. segmentio/kafka-go:sync很慢,async很快!

和sarama一样,segmentio/kafka-go也是一个纯go实现的kafka client,并且在很多公司的生产环境经历过考验,segmentio/kafka-go提供低级conn api和高级api(reader和writer),以writer为例,相对低级api,它是并发safe的,还提供连接保持和重试,无需开发者自己实现,另外writer还支持sync和async写、带context.Context的超时写等。

不过Writer的sync模式写十分慢,1秒钟才几十条,但async模式就飞快了!

不过和confluent-kafka-go一样,segmentio/kafka-go也没有像sarama那样提供mock测试包,我们需要自己建立环境测试。kafka-go官方的建议时:在本地启动一个kafka服务,然后运行测试。在轻量级容器十分流行的时代,是否需要mock还真是一件值得思考的事情

segmentio/kafka-go的使用体验非常棒,至今没有遇到过什么大问题,这里不举例了,例子见下面benchmark章节。

五. 写入性能

即便是简要对比,也不能少了benchmark。这里针对上面三个包分别建立了顺序benchmark和并发benchmark的测试用例:

// https://github.com/bigwhite/experiments/blob/master/kafka-clients/benchmark/kafka_clients_test.go

var m = []byte("this is benchmark for three mainstream kafka client")

func BenchmarkSaramaAsync(b *testing.B) {
    b.ReportAllocs()
    config := sarama.NewConfig()
    producer, err := sarama.NewAsyncProducer([]string{"localhost:29092"}, config)
    if err != nil {
        panic(err)
    }

    message := &sarama.ProducerMessage{Topic: "test", Value: sarama.ByteEncoder(m)}

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        producer.Input() <- message
    }
}

func BenchmarkSaramaAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    config := sarama.NewConfig()
    producer, err := sarama.NewAsyncProducer([]string{"localhost:29092"}, config)
    if err != nil {
        panic(err)
    }

    message := &sarama.ProducerMessage{Topic: "test", Value: sarama.ByteEncoder(m)}

    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            producer.Input() <- message
        }
    })
}

func BenchmarkKafkaGoAsync(b *testing.B) {
    b.ReportAllocs()
    w := &kafkago.Writer{
        Addr:     kafkago.TCP("localhost:29092"),
        Topic:    "test",
        Balancer: &kafkago.LeastBytes{},
        Async:    true,
    }

    c := context.Background()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        w.WriteMessages(c, kafkago.Message{Value: []byte(m)})
    }
}

func BenchmarkKafkaGoAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    w := &kafkago.Writer{
        Addr:     kafkago.TCP("localhost:29092"),
        Topic:    "test",
        Balancer: &kafkago.LeastBytes{},
        Async:    true,
    }

    c := context.Background()
    b.ResetTimer()

    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            w.WriteMessages(c, kafkago.Message{Value: []byte(m)})
        }
    })
}

func ReadConfig(configFile string) ckafkago.ConfigMap {
    m := make(map[string]ckafkago.ConfigValue)

    file, err := os.Open(configFile)
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to open file: %s", err)
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        line := strings.TrimSpace(scanner.Text())
        if !strings.HasPrefix(line, "#") && len(line) != 0 {
            kv := strings.Split(line, "=")
            parameter := strings.TrimSpace(kv[0])
            value := strings.TrimSpace(kv[1])
            m[parameter] = value
        }
    }

    if err := scanner.Err(); err != nil {
        fmt.Printf("Failed to read file: %s", err)
        os.Exit(1)
    }

    return m

}

func BenchmarkConfluentKafkaGoAsync(b *testing.B) {
    b.ReportAllocs()
    conf := ReadConfig("./confluent-kafka-go.conf")

    topic := "test"
    p, _ := ckafkago.NewProducer(&conf)

    go func() {
        for _ = range p.Events() {
        }
    }()

    key := []byte("")
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        p.Produce(&ckafkago.Message{
            TopicPartition: ckafkago.TopicPartition{Topic: &topic, Partition: ckafkago.PartitionAny},
            Key:            key,
            Value:          m,
        }, nil)
    }
}

func BenchmarkConfluentKafkaGoAsyncInParalell(b *testing.B) {
    b.ReportAllocs()
    conf := ReadConfig("./confluent-kafka-go.conf")

    topic := "test"
    p, _ := ckafkago.NewProducer(&conf)

    go func() {
        for range p.Events() {
        }
    }()

    var mu sync.Mutex
    key := []byte("")
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            p.Produce(&ckafkago.Message{
                TopicPartition: ckafkago.TopicPartition{Topic: &topic, Partition: ckafkago.PartitionAny},
                Key:            key,
                Value:          m,
            }, nil)
            mu.Unlock()
        }
    })
}

本地启动一个kafka服务,运行该benchmark:

$go test -bench .
goos: linux
goarch: amd64
pkg: kafka_clients
cpu: Intel(R) Core(TM) i7-9700 CPU @ 3.00GHz
BenchmarkSaramaAsync-4                            802070          2267 ns/op         294 B/op          1 allocs/op
BenchmarkSaramaAsyncInParalell-4                 1000000          1913 ns/op         294 B/op          1 allocs/op
BenchmarkKafkaGoAsync-4                          1000000          1208 ns/op         376 B/op          5 allocs/op
BenchmarkKafkaGoAsyncInParalell-4                1768538          703.4 ns/op        368 B/op          5 allocs/op
BenchmarkConfluentKafkaGoAsync-4                 1000000          3154 ns/op         389 B/op         10 allocs/op
BenchmarkConfluentKafkaGoAsyncInParalell-4        742476          1863 ns/op         390 B/op         10 allocs/op

我们看到,虽然sarama在内存分配上有优势,但综合性能上还是segmentio/kafka-go最优。

六. 小结

本文对比了Go社区的三个主流kafka客户端包:Shopify/sarama、confluent-kafka-go和segmentio/kafka-go。sarama应用最广,也是我研究时间最长的一个包,但坑也是最多的,放弃;confluent-kafka-go虽然是官方的,但是基于cgo,无奈放弃;最后,我们选择了segmentio/kafka-go,已经在线上运行了一段时间,至今尚未发现重大问题。

不过,本文的对比仅限于作为Producer这块的场景,是一个“不完全”的介绍。后续如有更多场景的实践经验,还会再补充。

本文中的源码可以在这里下载。


“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2022年,Gopher部落全面改版,将持续分享Go语言与Go应用领域的知识、技巧与实践,并增加诸多互动形式。欢迎大家加入!

img{512x368}
img{512x368}
img{512x368}
img{512x368}
img{512x368}

我爱发短信:企业级短信平台定制开发专家 https://tonybai.com/。smspush : 可部署在企业内部的定制化短信平台,三网覆盖,不惧大并发接入,可定制扩展; 短信内容你来定,不再受约束, 接口丰富,支持长短信,签名可选。2020年4月8日,中国三大电信运营商联合发布《5G消息白皮书》,51短信平台也会全新升级到“51商用消息平台”,全面支持5G RCS消息。

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博:https://weibo.com/bigwhite20xx
  • 微信公众号:iamtonybai
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • “Gopher部落”知识星球:https://public.zsxq.com/groups/51284458844544

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

一文搞懂Go语言的plugin

本文永久链接 – https://tonybai.com/2021/07/19/understand-go-plugin

要历数Go语言中还有哪些我还没用过的特性,在Go 1.8版本中引入的go plugin算一个。近期想给一个网关类平台设计一个插件系统,于是想起了go plugin^_^。

Go plugin支持将Go包编译为共享库(.so)的形式单独发布,主程序可以在运行时动态加载这些编译为动态共享库文件的go plugin,从中提取导出(exported)变量或函数的符号并在主程序的包中使用。Go plugin的这种特性为Go开发人员提供更多的灵活性,我们可以用之实现支持热插拔的插件系统。

但不得不提到的一个事实是:go plugin自诞生以来已有4年多了,但它依旧没有被广泛地应用起来。究其原因,(我猜)一方面Go自身支持静态编译,可以将应用编译为一个完全不需要依赖操作系统运行时库(一般为libc)的可执行文件,这是Go的优势,而支持go plugin则意味着你只能对主程序进行动态编译,与静态编译的优势相悖;而另外一方面原因占比更大,那就是Go plugin自身有太多的对使用者的约束,这让很多Go开发人员望而却步。

只有亲历,才能体会到其中的滋味。在这篇文章中,我们就一起来看看go plugin究竟是何许东东,它对使用者究竟有着怎样的约束,我们究竟要不要使用它。

1. go plugin的基本使用方法

截至Go 1.16版本,Go官方文档明确说明go plugin只支持Linux, FreeBSD和macOS,这算是go plugin的第一个约束。在处理器层面,go plugin以支持amd64(x86-64)为主,对arm系列芯片的支持似乎没有明确说明(我翻看各个Go版本release notes也没看到,也许是我漏掉了),但我在华为的泰山服务器(鲲鹏arm64芯片)上使用Go 1.16.2(for arm64)版本构建plugin包以及加载动态共享库.so文件的主程序都顺利通过编译,运行也一切正常。

主程序通过plugin包加载.so并提取.so文件中的符号的过程与C语言应用运行时加载动态链接库并调用库中函数的过程如出一辙。下面我们就来看一个直观的例子。

下面是这个例子的结构布局:

// github.com/bigwhite/experiments/tree/master/go-plugin

├── demo1
│   ├── go.mod
│   ├── main.go
│   └── pkg
│       └── pkg1
│           └── pkg1.go
└── demo1-plugins
    ├── Makefile
    ├── go.mod
    └── plugin1.go

其中demo1代表主程序工程,demo1-plugins是主程序的plugins工程。下面是插件工程的代码:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo1-plugins/plugin1.go

package main

import (
    "fmt"
    "log"
)

func init() {
    log.Println("plugin1 init")
}

var V int

func F() {
    fmt.Printf("plugin1: public integer variable V=%d\n", V)
}

type foo struct{}

func (foo) M1() {
    fmt.Println("plugin1: invoke foo.M1")
}

var Foo foo

plugin包和普通的Go包没太多区别,只是plugin包有一个约束:其包名必须为main,我们使用下面命令编译该plugin:

$go build -buildmode=plugin -o plugin1.so plugin1.go

如果plugin源代码没有放置在main包下面,我们在编译plugin时会遭遇如下编译器错误:

-buildmode=plugin requires exactly one main package

接下来,我们来看主程序(demo1):

package main

import (
    "fmt"

    "github.com/bigwhite/demo1/pkg/pkg1"
)

func main() {
    err := pkg1.LoadAndInvokeSomethingFromPlugin("../demo1-plugins/plugin1.so")
    if err != nil {
        fmt.Println("LoadAndInvokeSomethingFromPlugin error:", err)
        return
    }
    fmt.Println("LoadAndInvokeSomethingFromPlugin ok")
}

下面是主程序demo1工程中的关键代码:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo1/main.go
package main

import (
    "fmt"

    "github.com/bigwhite/demo1/pkg/pkg1"
)

func main() {
    err := pkg1.LoadAndInvokeSomethingFromPlugin("../demo1-plugins/plugin1.so")
    if err != nil {
        fmt.Println("LoadAndInvokeSomethingFromPlugin error:", err)
        return
    }
    fmt.Println("LoadAndInvokeSomethingFromPlugin ok")
}

我们在main函数中调用pkg1包的LoadAndInvokeSomethingFromPlugin函数,该函数会加载main函数传入的go plugin、查找plugin中相应符号并通过这些符号使用plugin中的导出变量、函数等。下面是LoadAndInvokeSomethingFromPlugin函数的实现:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo1/pkg/pkg1/pkg1.go

package pkg1

import (
    "errors"
    "plugin"
    "log"
)

func init() {
    log.Println("pkg1 init")
}

type MyInterface interface {
    M1()
}

func LoadAndInvokeSomethingFromPlugin(pluginPath string) error {
    p, err := plugin.Open(pluginPath)
    if err != nil {
        return err
    }

    // 导出整型变量
    v, err := p.Lookup("V")
    if err != nil {
        return err
    }
    *v.(*int) = 15

    // 导出函数变量
    f, err := p.Lookup("F")
    if err != nil {
        return err
    }
    f.(func())()

    // 导出自定义类型变量
    f1, err := p.Lookup("Foo")
    if err != nil {
        return err
    }
    i, ok := f1.(MyInterface)
    if !ok {
        return errors.New("f1 does not implement MyInterface")
    }
    i.M1()

    return nil
}

在LoadAndInvokeSomethingFromPlugin函数中,我们通过plugin包提供的Plugin类型提供的Lookup方法在加载的.so中查找相应的导出符号,比如上面的V、F和Foo等。Lookup方法返回plugin.Symbol类型,而Symbol类型定义如下:

// $GOROOT/src/plugin/plugin.go
type Symbol interface{}

我们看到Symbol的底层类型(underlying type)是interface{},因此它可以承载从plugin中找到的任何类型的变量、函数(得益于函数是一等公民)的符号。而plugin中定义的类型则是不能被主程序查找的,通常主程序也不会依赖plugin中定义的类型。

一旦Lookup成功,我们便可以将符号通过类型断言(type assert)获取到其真实类型的实例,通过这些实例(变量或函数),我们可以调用plugin中实现的逻辑。编译plugin后,运行上述主程序,我们可以看到如下结果:

$go run main.go
2021/06/15 10:05:22 pkg1 init
try to LoadAndInvokeSomethingFromPlugin...
2021/06/15 10:05:22 plugin1 init
plugin1: public integer variable V=15
plugin1: invoke foo.M1
LoadAndInvokeSomethingFromPlugin ok

那么,主程序是如何知道导出的符号究竟是函数还是变量呢?这取决于主程序插件系统的设计,因为主程序与plugin间必然要有着某种“契约”或“约定”。就像上面主程序定义的MyInterface接口类型,它就是一个主程序与plugin之间的约定,plugin中只要暴露实现了该接口的类型实例,主程序便可以通过MyInterface接口类型实例与其建立关联并调用plugin中的实现 。

2. plugin中包的初始化

在上面的例子中我们看到,插件的初始化(plugin1 init)发生在主程序open .so文件时。按照官方文档的说法:“当一个插件第一次被open时,plugin中所有不属于主程序的包的init函数将被调用,但一个插件只被初始化一次,而且不能被关闭”。

我们来验证一下在主程序中多次加载同一个plugin的情况,这次我们将程序升级为demo2和demo2-plugins:

主程序代码如下:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo2/main.go

package main

import (
    "fmt"

    "github.com/bigwhite/demo2/pkg/pkg1"
)

func main() {
    fmt.Println("try to LoadPlugin...")
    err := pkg1.LoadPlugin("../demo2-plugins/plugin1.so")
    if err != nil {
        fmt.Println("LoadPlugin error:", err)
        return
    }
    fmt.Println("LoadPlugin ok")
    err = pkg1.LoadPlugin("../demo2-plugins/plugin1.so")
    if err != nil {
        fmt.Println("Re-LoadPlugin error:", err)
        return
    }
    fmt.Println("Re-LoadPlugin ok")
}

package pkg1

import (
    "log"
    "plugin"
)

func init() {
    log.Println("pkg1 init")
}

func LoadPlugin(pluginPath string) error {
    _, err := plugin.Open(pluginPath)
    if err != nil {
        return err
    }
    return nil
}

由于仅是验证初始化,我们去掉了查找符号和调用的环节。plugin的代码如下:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo2-plugins/plugin1.go
package main

import (
    "log"

    _ "github.com/bigwhite/common"
)

func init() {
    log.Println("plugin1 init")
}

在demo2的plugin中,我们同样仅保留初始化相关的代码,这里我们在demo2的plugin1中还增加了一个外部依赖:github.com/bigwhite/common。

运行上述代码:

$go run main.go
2021/06/15 10:50:47 pkg1 init
try to LoadPlugin...
2021/06/15 10:50:47 common init
2021/06/15 10:50:47 plugin1 init
LoadPlugin ok
Re-LoadPlugin ok

通过这个输出结果,我们验证了两点说法:

  • 重复加载同一个plugin,不会触发多次plugin包的初始化,上述结果中仅输出一次:“plugin1 init”;
  • plugin中依赖的包,但主程序中没有的包,在加载plugin时,这些包会被初始化,如:“commin init”。

如果主程序也依赖github.com/bigwhite/common包,我们在主程序的main包中增加一行:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo2/main.go
import (
    "fmt"   

    _ "github.com/bigwhite/common"    // 增加这一行
    "github.com/bigwhite/demo2/pkg/pkg1"
)

那么我们再执行demo2,输出如下结果:

2021/06/15 11:00:00 common init
2021/06/15 11:00:00 pkg1 init
try to LoadPlugin...
2021/06/15 11:00:00 plugin1 init
LoadPlugin ok
Re-LoadPlugin ok

我们看到common包在demo2主程序中已经做了初始化,这样当加载plugin时,common包不会再进行初始化了。

3. go plugin的使用约束

开篇我们就提到了,go plugin应用不甚广泛的一个主因是其约束较多,这里我们来看一下究竟go plugin都有哪些约束:

1) 主程序与plugin的共同依赖包的版本必须一致

在上面demo2中,主程序和plugin依赖的github.com/bigwhite/common包是一个本地module,我们在go.mod中使用replace指向本地路径:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo2/go.mod

module github.com/bigwhite/demo2

replace github.com/bigwhite/common => /Users/tonybai/go/src/github.com/bigwhite/experiments/go-plugin/common

require github.com/bigwhite/common v0.0.0-20180202201655-eb2c6b5be1b6 // 这个版本号是自行“伪造”的

go 1.16

如果我clone一份common包,将其放在common1目录下,并在plugin的go.mod中将replace github.com/bigwhite/common语句指向common1目录,我们重新编译主程序和plugin后,运行主程序,我们将得到如下结果:

$go run main.go
2021/06/15 14:09:07 common init
2021/06/15 14:09:07 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo2-plugins/plugin1"): plugin was built with a different version of package github.com/bigwhite/common

我们看到因common的版本不同,plugin加载失败,这是plugin使用的一个约束:主程序与plugin的共同依赖包的版本必须一致

我们再来看一个主程序与plugin有共同以来包的例子。我们建立demo3,在这个版本中,主程序和plugin都依赖了logrus日志包,但主程序使用的是logrus 1.8.1版本,而plugin使用的是logrus 1.8.0版本,分别编译后,我们运行主程序:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo3

2021/06/15 14:18:35 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo3-plugins/plugin1"): plugin was built with a different version of package github.com/sirupsen/logrus

我们看到主程序运行报错,和前面的例子提示一样,都是因为使用了版本不一致的第三方包。要想解决这个问题,我们只需让两者使用的logrus包版本保持一致即可,比如将主程序的logrus从v1.8.1降级为v1.8.0:

$go get github.com/sirupsen/logrus@v1.8.0
go get: downgraded github.com/sirupsen/logrus v1.8.1 => v1.8.0
$go run main.go
2021/06/15 14:19:09 pkg1 init
try to LoadPlugin...
2021/06/15 14:19:09 plugin1 init
LoadPlugin ok

我们看到降级logrus版本后,主程序便可以正常加载plugin了。

还有一种情况,那就是主程序与plugin使用了同一个module的不同major版本的包,由于major版本不同,虽然是同一module,但实则是两个不同的包,这不会影响主程序对plugin的加载。但问题在于这个被共同依赖的module也会有自己的依赖包,当其不同major版本所依赖的某个包的版本不同时,同样会导致主程序加载plugin出现问题。 比如:主程序依赖go-redis/redis的v6.15.9+incompatible版本,而plugin依赖的是go-redis/redis/v8版本,当我们使用这样的主程序去加载plugin时,我们会遇到如下错误:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo3

$go run main.go
2021/06/15 14:32:11 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo3-plugins/plugin1"): plugin was built with a different version of package golang.org/x/sys/unix

我们看到redis版本并未出错,但问题出在redis与redis/v8所依赖的golang.org/x/sys的版本不同,这种间接依赖的module的版本的不一致同样会导致go plugin加载失败,这同样是go plugin的使用约束之一。

2) 如果采用mod=vendor构建,那么主程序和plugin必须基于同一个vendor目录构建

基于vendor构建是go 1.5版本引入的特性,go 1.11版本引入go module构建模式后,vendor构建的方式得以保留。那么问题来了,如果主程序或plugin采用vendor构建或同时采用vendor构建,那么主程序是否可以正常加载plugin呢?我们来用示例demo4验证一下。(demo4和demo3大同小异,这里就不列出具体代码了)。

首先我们分别为主程序(demo4)和plugin(demo4-plugins)生成vendor目录:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go mod vendor

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4-plugins
$go mod vendor

我们测试如下三种情况(go 1.16版本默认在有vendor的情况下,优先使用vendor构建。所以要基于mod构建需要显式的传入-mod=mod):

  • 主程序基于mod构建,插件基于vendor构建
// github.com/bigwhite/experiments/tree/master/go-plugin/demo4-plugins
$go build -mod=vendor -buildmode=plugin -o plugin1.so plugin1.go

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build -mod=mod -o main.mod main.go

$main.mod
2021/06/15 15:41:21 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo4-plugins/plugin1"): plugin was built with a different version of package golang.org/x/sys/unix
  • 主程序基于vendor构建,插件基于mod构建
// github.com/bigwhite/experiments/tree/master/go-plugin/demo4-plugins
$go build -mod=mod -buildmode=plugin -o plugin1.so plugin1.go

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build -mod=vendor -o main.vendor main.go

$./main.vendor
2021/06/15 15:44:15 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo4-plugins/plugin1"): plugin was built with a different version of package golang.org/x/sys/unix
  • 主程序和插件分别基于各自的vendor构建
// github.com/bigwhite/experiments/tree/master/go-plugin/demo4-plugins
$go build -mod=vendor -buildmode=plugin -o plugin1.so plugin1.go

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build -mod=vendor -o main.vendor main.go

$./main.vendor
2021/06/15 15:45:11 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo4-plugins/plugin1"): plugin was built with a different version of package golang.org/x/sys/unix

从上面的测试,我们看到无论是哪一方采用vendor构建,或者双方都基于各自vendor构建,主程序加载plugin都会失败。如何解决这一问题呢?让主程序和plugin基于同一个vendor构建!

我们将plugin1.go拷贝到demo4中,然后分别用vendor构建构建主程序和plugin1.go:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build -mod=vendor -o main.vendor main.go

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build -mod=vendor -buildmode=plugin -o plugin1.so plugin1.go

将编译生成的plugin1.so拷贝到demo4-plugins中,然后运行main.vendor:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$cp plugin1.so ../demo4-plugins
$main.vendor
2021/06/15 15:48:56 pkg1 init
try to LoadPlugin...
2021/06/15 15:48:56 plugin1 init
LoadPlugin ok

我们看到基于同一vendor的主程序与plugin是可以相容的。下面的表格总结了主程序与plugin采用不同构建模式时是否相容:

插件构建方式\主程序构建方式 基于mod 基于自己的vendor
基于mod 加载成功 加载失败
基于基于自己的vendor 加载失败 加载失败

在vendor构建模式下,只有基于同一个vendor目录构建时,plugin才能被主程序加载成功

3) 主程序与plugin使用的编译器版本必须一致

如果我们使用不同版本的Go编译器分别编译主程序以及plugin,那么这两者是否能相容呢?我们还拿demo4来验证一下。我在主机上准备了go 1.16.5和go 1.16两个版本的Go编译器,go 1.16.5是go 1.16的patch维护版本,其区别与go 1.16与go 1.15相比则不是一个量级的,我们用go 1.16编译主程序,用go 1.16.5编译plugin:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4-plugins
$go version
go version go1.16.5 darwin/amd64
$go build -buildmode=plugin -o plugin1.so plugin1.go

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go version
go version go1.16 darwin/amd64

$go run main.go
2021/06/15 15:58:44 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin.Open("../demo4-plugins/plugin1"): plugin was built with a different version of package runtime/internal/sys

我们看到即便用patch版本编译,plugin与主程序也是不兼容的。我们将demo4升级到用go 1.16.5版本编译:

$go version
go version go1.16.5 darwin/amd64
$go run main.go
2021/06/15 15:59:05 pkg1 init
try to LoadPlugin...
2021/06/15 15:59:05 plugin1 init
LoadPlugin ok

我们看到只有主程序与plugin采用完全相同的版本(patch版本也要相同)编译时,它们才是相容的,主程序才能正常加载plugin。

那么操作系统版本是否影响主程序和plugin的相容性呢?这个没有官方说明,我亲测了一下。我在centos 7.6(amd64, go 1.16.5)上构建了demo4-plugin(基于mod=mod),然后将其拷贝到一台ubuntu 18.04(amd64, go1.16.5)的主机上,ubuntu主机上的demo4主程序可以与centos上编译出来的plugin相容。

4) 使用plugin的主程序仅能使用动态链接

Go以静态编译便于分发和部署著称,但使用plugin的主程序仅能使用动态链接。不信?那我们来挑战一下静态编译demo4中的主程序。

先来看看默认编译的情况:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$go build main.go
$ldd main
    linux-vdso.so.1 (0x00007ffc05b73000)
    libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007f6a9fa3f000)
    libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007f6a9f820000)
    libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f6a9f42f000)
    /lib64/ld-linux-x86-64.so.2 (0x00007f6a9fc43000)

我们看到默认编译的情况下,demo4主程序被编译为一个需要在运行时动态链接的可执行文件,它依赖诸多linux系统运行时库,比如:libc等。

这一切的原因都是我们在demo4中使用了一些通过cgo实现的标准库,比如plugin包:

// $GOROOT/src/plugin/plugin_dlopen.go

// +build linux,cgo darwin,cgo freebsd,cgo

package plugin

/*
#cgo linux LDFLAGS: -ldl
#include <dlfcn.h>
#include <limits.h>
#include <stdlib.h>
#include <stdint.h>

#include <stdio.h>

static uintptr_t pluginOpen(const char* path, char** err) {
    void* h = dlopen(path, RTLD_NOW|RTLD_GLOBAL);
    if (h == NULL) {
        *err = (char*)dlerror();
    }
    return (uintptr_t)h;
}
... ...
*/

我们看到plugin_dlopen.go的头部有build指示符,它仅在cgo开启的前提下才会被编译,如果我们去掉cgo,比如利用下面这行命令:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4
$ CGO_ENABLED=0 go build main.go
$ ldd main
    not a dynamic executable

我们确实编译出一个静态链接的可执行文件,但当我们执行该文件时,我们得到如下结果:

$ ./main
2021/06/15 17:01:51 pkg1 init
try to LoadPlugin...
LoadPlugin error: plugin: not implemented

我们看到由于cgo被关闭,plugin包的一些函数并没有被编译到最终可执行文件中,于是报了”not implemented”的错误!

在CGO开启的情况下,我们依旧可以让外部链接器使用静态链接,我们再来试一下:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo4

$ go build -o main-static -ldflags '-linkmode "external" -extldflags "-static"' main.go
# command-line-arguments
/tmp/go-link-638385712/000001.o: In function `pluginOpen':
/usr/local/go/src/plugin/plugin_dlopen.go:19: warning: Using 'dlopen' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
$ ldd main-static
    not a dynamic executable

我们的确得到了一个静态编译的二进制文件,但编译器也给出了warning。

执行这个文件:

$ ./main-static
2021/06/15 17:02:35 pkg1 init
try to LoadPlugin...
fatal error: runtime: no plugin module data

goroutine 1 [running]:
runtime.throw(0x5d380a, 0x1e)
    /usr/local/go/src/runtime/panic.go:1117 +0x72 fp=0xc000091b50 sp=0xc000091b20 pc=0x435712
plugin.lastmoduleinit(0xc000076210, 0x1001, 0x1001, 0xc000010040, 0x24db1f0)
    /usr/local/go/src/runtime/plugin.go:20 +0xb50 fp=0xc000091c48 sp=0xc000091b50 pc=0x466750
plugin.open(0x5d284c, 0x18, 0xc0000788f0, 0x0, 0x0)
    /usr/local/go/src/plugin/plugin_dlopen.go:77 +0x4ef fp=0xc000091ec0 sp=0xc000091c48 pc=0x4dad8f
plugin.Open(...)
    /usr/local/go/src/plugin/plugin.go:32
github.com/bigwhite/demo4/pkg/pkg1.LoadPlugin(0x5d284c, 0x1b, 0xc000091f48, 0x1)
    /root/test/go/plugin/demo4/pkg/pkg1/pkg1.go:13 +0x35 fp=0xc000091ef8 sp=0xc000091ec0 pc=0x4dbbb5
main.main()
    /root/test/go/plugin/demo4/main.go:12 +0xa5 fp=0xc000091f88 sp=0xc000091ef8 pc=0x4ee805
runtime.main()
    /usr/local/go/src/runtime/proc.go:225 +0x256 fp=0xc000091fe0 sp=0xc000091f88 pc=0x438196
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:1371 +0x1 fp=0xc000091fe8 sp=0xc000091fe0 pc=0x46a841

warning最终演变为运行时的panic,看来使用plugin的主程序只能编译为动态链接的可执行程序了。目前go项目有多个issue与此有关:

  • https://github.com/golang/go/issues/33072
  • https://github.com/golang/go/issues/17150
  • https://github.com/golang/go/issues/18123

4. plugin版本管理

使用动态链接实现插件系统,一个更大的问题就是插件的版本管理问题。

linux上的动态链接库采用soname的方式进行版本管理。soname的关键功能是它提供了兼容性的标准,当要升级系统中的一个库时,并且新库的soname和老库的soname一样,用旧库链接生成的程序使用新库依然能正常运行。这个特性使得在Linux下,升级使得共享库的程序和定位错误变得十分容易。

什么是soname呢? 在/lib和/usr/lib等集中放置共享库的目录下,你总是会看到诸如下面的情况:

2019-12-10 12:28 libfoo.so -> libfoo.so.0.0.0*
2019-12-10 12:28 libfoo.so.0 -> libfoo.so.0.0.0*
2019-12-10 12:28 libfoo.so.0.0.0*

关于libfoo.so居然有三个文件入口,其中libfoo.so.0.0.0是真正的共享库文件,而其他两个文件入口则是指向libfoo.so.0.0.0的符号链接。为何会出现这个情况呢?这与共享库的命名惯例和版本管理不无关系。

共享库的惯例中每个共享库都有多个名字属性,包括real name、soname和linker name:

  • real name

real name指的是实际包含共享库代码的那个文件的名字(如上面例子中的libfoo.so.0.0.0),也是在共享库编译命令行中-o后面的那个参数;

  • soname

soname则是shared object name的缩写,也是这三个名字中最重要的一个,无论是在编译阶段还是在运行阶段,系统链接器都是通过共享库的soname(如上面例子中的libfoo.so.0)来唯一识别共享库的。即使real name相同但soname不同,也会被链接器认为是两个不同的库。共享库的soname可在编译期间通过传给链接器的参数来指定,如我们可以通过”gcc -shared -Wl,-soname -Wl,libfoo.so.0 -o libfoo.so.0.0.0 libfoo.o”来指定libfoo.so.0.0.0的soname为libfoo.so.0。ldconfig -n directory_with_shared_libraries命令会根据共享库的soname自动生成一个名为soname的符号链接指向real name文件,当然你也可以通过ln命令自己来创建这个符号链接。另外在linux下我们可通过readelf -d查看共享库的soname,ldd输出的ELF文件依赖的共享库列表中显示的也是共享库的soname及所在路径。

  • linker name

linker name是编译阶段提供给编译器的名字(如上面例子中的libfoo.so)。如果你构建的共享库的real name是类似于上例中libfoo.so.0.0.0那样的带有版本号的样子,那么你在编译器命令中直接使用-L path -lfoo是无法让链接器找到对应的共享库文件的,除非你为libfoo.so.0.0.0提供了一个linker name(如libfoo.so,一个指向libfoo.so.0.0.0的符号链接)。linker name一般在共享库安装时手工创建。

那么go plugin是否可以用soname的方式来做版本管理呢?基于demo1我们创建demo5,并来做一下试验。

在demo5-plugins中,我们为构建出的.so增加版本信息:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo5-plugins

$go build -buildmode=plugin -o plugin1.so.1.1 plugin1.go
$ln -s plugin1.so.1.1 plugin1.so.1
$ls -l
lrwxr-xr-x  1 tonybai  staff       14  7 16 05:42 plugin1.so.1@ -> plugin1.so.1.1
-rw-r--r--  1 tonybai  staff  2888408  7 16 05:42 plugin1.so.1.1

我们通过ln命令为构建出的plugin1.so.1.1创建了一个符号链接plugin1.so.1,plugin1.so.1作为我们插件的soname传给demo5:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo5/main.go

func main() {
    fmt.Println("try to LoadAndInvokeSomethingFromPlugin...")
    err := pkg1.LoadAndInvokeSomethingFromPlugin("../demo5-plugins/plugin1.so.1")
    if err != nil {
        fmt.Println("LoadAndInvokeSomethingFromPlugin error:", err)
        return
    }
    fmt.Println("LoadAndInvokeSomethingFromPlugin ok")
}

运行demo5:

// github.com/bigwhite/experiments/tree/master/go-plugin/demo5

$go run main.go
2021/07/16 05:58:33 pkg1 init
try to LoadAndInvokeSomethingFromPlugin...
2021/07/16 05:58:33 plugin1 init
plugin1: public integer variable V=15
plugin1: invoke foo.M1
LoadAndInvokeSomethingFromPlugin ok

我们看到以soname传入的插件被顺利加载并提取符号。

后续如果plugin发生变更,比如打了patch,我们只需要升级plugin为plugin1.so.1.2,然后soname依旧保持不变,主程序也无需变动。

注意:如果插件名相同,内容相同,主程序多次加载不会出现问题;但插件名相同,但内容不同,主程序运行时多次load会导致runtime panic,并且是无法恢复的panic。所以务必做好插件的版本管理

5. 小结

go plugin是go语言原生提供的一种go插件方案(非go插件方案,可以使用c shared library等)。但经过上面的实验和学习,我们我们看到了plugin使用的诸多约束,这的确给go plugin的推广使用造成的很大障碍,导致目前go plugin应用不甚广泛。

根据上面看到的种种约束,如果要应用go plugin,必须要做到:

  • 构建环境一致
  • 对第三方包的版本一致。

因此,业内在使用go plugin时多利用builder container(用来构建程序的容器)来保证主程序和plugin使用相同的构建环境。

在go plugin为数不多的用户中,有三个比较知名的开源项目值得后续认真研究:

  • gosh: https://github.com/vladimirvivien/gosh
  • tyk api gateway: https://github.com/TykTechnologies/tyk
  • tidb : https://github.com/pingcap/tidb

尤其是tidb,还给出了其插件系统使用go plugin的完整设计方案:https://github.com/pingcap/tidb/blob/master/docs/design/2018-12-10-plugin-framework.md,值得大家细致品读。

本文涉及的所有源码可以在这里下载:https://github.com/bigwhite/experiments/tree/master/go-plugin 。

6. 参考资料

  • https://golang.org/pkg/plugin/
  • https://golang.org/cmd/go/#hdr-Build_modes
  • https://golang.org/doc/go1.8
  • https://www.reddit.com/r/golang/comments/b6h8qq/is_anyone_actually_using_go_plugins/
  • https://medium.com/@alperkose/things-to-avoid-while-using-golang-plugins-f34c0a636e8
  • https://medium.com/learning-the-go-programming-language/writing-modular-go-programs-with-plugins-ec46381ee1a9

“Gopher部落”知识星球正式转正(从试运营星球变成了正式星球)!“gopher部落”旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!部落目前虽小,但持续力很强。在2021年上半年,部落将策划两个专题系列分享,并且是部落独享哦:

  • Go技术书籍的书摘和读书体会系列
  • Go与eBPF系列

欢迎大家加入!

Go技术专栏“改善Go语⾔编程质量的50个有效实践”正在慕课网火热热销中!本专栏主要满足广大gopher关于Go语言进阶的需求,围绕如何写出地道且高质量Go代码给出50条有效实践建议,上线后收到一致好评!欢迎大家订
阅!

img{512x368}

我的网课“Kubernetes实战:高可用集群搭建、配置、运维与应用”在慕课网热卖中,欢迎小伙伴们订阅学习!

img{512x368}

我爱发短信:企业级短信平台定制开发专家 https://tonybai.com/。smspush : 可部署在企业内部的定制化短信平台,三网覆盖,不惧大并发接入,可定制扩展; 短信内容你来定,不再受约束, 接口丰富,支持长短信,签名可选。2020年4月8日,中国三大电信运营商联合发布《5G消息白皮书》,51短信平台也会全新升级到“51商用消息平台”,全面支持5G RCS消息。

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博:https://weibo.com/bigwhite20xx
  • 微信公众号:iamtonybai
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • “Gopher部落”知识星球:https://public.zsxq.com/groups/51284458844544

微信赞赏:
img{512x368}

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats