标签 Java 下的文章

Gopher的Rust第一课:Rust的那些事儿

本文永久链接 – https://tonybai.com/2024/04/22/gopher-rust-first-lesson-all-about-rust

要说这两年后端编程语言谁最火,Rust说自己第二,没人敢说第一。Rust连续8年霸榜stackoverflow最受推崇的编程语言,甚至被推特之父Jack Dorsey称为“完美的编程语言”:

注:最火:仅代表人气最旺,最受欢迎,但并不代表使用者最多。

如果你经常读我的博客,你可能会问:“你不是Go语言布道师吗?怎么也要转Rust了?”其实不然,学习Rust不是要蹭热度,而是实际开发的需要。这些年在汽车行业这个赛道上,云端和车端都要兼顾。目前车端基础软件的开发语言主要是C/C++,但内存安全、性能不输C且高可靠的Rust日益受到车载软件开发的青睐,AUTOSAR组织在2022年成立了Rust工作组就是一个重要的信号。并且据我所知,一些国内造车新势力已经或正在将一些Rust开发的中间件或应用放到了量产车或即将量产的车上。

注:AUTOSAR (Automotive Open System Architecture) 是一个面向汽车电子系统的开放式软件架构标准,由汽车制造商、零部件供应商和电子供应商共同发起并持续维护的一个全球性标准化组织。

不过,Rust语言在某些领域的崛起确实引发了其他编程语言社区的一些不满和争议。特别是Rust社区的一些人提出“Rewrite Everything in Rust”的观点,让很多编程语言社区,尤其是C++社区十分不安。Go社区则相对更加开放和友好的,主流观点是Go与Rust是可以互补的,两种语言在各自的优势领域发挥作用,通过合作而非对抗的方式,能为开发者提供更好的选择。更多细节,可以参考几年前我曾翻译过的前Go团队产品经理、gohugo的作者Steve Francia联合创作的一篇文章《Rust vs. Go:为什么强强联合会更好》。

也就是说Go依然是我的主力语言,但考虑工作上的需要,我要系统学学Rust了。为了避免“从入门到放弃”,我打算采用边学习边输出的方式,一方面可以督促自己学习,另一方面也希望能和读者及时互动,纠正学习中的错误理解。

我的Go语言第一课专栏广受欢迎,其知识结构想必也是大家认可的,这里我就仿照其形式,写一下学习Rust的第一课这个入门系列。

正如我在Go语言第一课专栏所说的那样:我一直认为,当你开始接触一门新语言的时候,你一定要去了解它的历史和现状。因为这样,你才能建立起对这门语言的整体认知,了解它未来的走向。而且,也能建立起学习的“安全感”,即相信它能够给你带来足够的价值和收益,更加坚定地学习下去。

在这篇文章中,我就先来了解一下Rust的诞生历史和现状发展,以及它独特的设计哲学。并与Go语言做个简单对比,希望能够让自己和读者对Rust有一个初步的认识。

1.1 Rust的历史与现状

1.1.1 Rust的诞生与演进

Rust诞生于2006年,这比Google三巨头“密谋”创建Go语言还要早上一年。不过和Go的三位创始人:图灵奖获得者、C语法联合发明人、Unix之父肯·汤普森(Ken Thompson),Plan 9操作系统领导者、UTF-8编码的最初设计者罗伯·派克(Rob Pike),以及Java的HotSpot虚拟机和Chrome浏览器的JavaScript V8引擎的设计者之一罗伯特·格瑞史莫(Robert Griesemer)相比,Rust之父格雷登·霍尔(Graydon Hoare)的身份和江湖地位却没有那么“显赫”。彼时的他只是Mozilla Research的一位加拿大籍的、不到30岁的开发人员:

注:Graydon Hoare这个人非常低调,极少在公开场合露面,因此在网络上很难找到他的肖像,上面图中的肖像来自https://www.crunchbase.com/person/graydon-hoare,我这里甚至不能保证这个肖像就是Hoare本人的。

新生代编程语言的诞生都伴随着一段轶事,比如Go语言的创始人们在Google内部经常遇到C++项目漫长的编译时间问题,每当他们启动一个C++项目的编译,都要等很长时间,期间都能喝上几杯咖啡。这让他们深有感触并意识到需要设计一门编译速度更快的新语言,于是Go语言就这样诞生了。和Go语言“喝咖啡,等C++项目编译”类似,Rust的诞生也有一段轶事:

2006年,29岁的Hoare有一天回到温哥华的家中,但他发现电梯坏了,电梯软件崩溃了!他不得不爬楼梯回到位于21层的家中。当他爬上楼梯时,他感到很恼火。他想:“我们做计算机编程的人居然无法制造出一部可以正常工作而不崩溃的电梯!” Hoare知道,许多此类崩溃都是由于程序使用内存的问题造成的。电梯等设备内部的软件通常是用C++或C语言编写的,这些语言以允许程序员编写运行速度非常快且相当紧凑的代码而闻名。问题是这些语言也很容易意外引入内存错误,这些错误会导致崩溃。Hoare决定对此做点什么。于是他打开笔记本电脑,开始设计一种新的计算机语言,他希望这种语言能够编写小而快速的代码,而不会出现内存错误,他将其命名为Rust

这段轶事显然不可考证了。但可以确定的是从2006年开始的若干年里,Hoare创建的个人语言项目Rust并没有真正的用于改善电梯系统的程序,而是在得到了Mozilla的赞助下,用在了持续开发Mozilla的浏览器引擎Servo上了,Mozilla在2010年官宣了该项目,Hoare在2010年的一次演讲中也第一次介绍了Rust语言:

Rust开源的第一行代码也是在2010年完成的:

此外,最初的Rust编译器是由OCaml实现的,2011年Rust团队使用Rust基于LLVM重新实现了编译器并实现了自举。同年,Rust也有了自己的LOGO,其设计灵感来自于自行车齿盘

2012年,Graydon Hoare接受InfoQ专访,谈及他带领Rust team在Mozilla开发的系统编程语言Rust,包括Rust的特性、Rust相对于C/C++/Java/Go的优势与不同以及Rust的1.0版本发布计划。

但是,就在下一年,即2013年,Graydon Hoare就因为精力耗尽而辞去了Rust team的领导职务,离开了自己的Rust team,并从此远离了Rust开发。Hoare的离开对Rust team和语言本身来说是一次重大的损失,但Rust社区和团队采取了积极的措施来确保Rust的持续发展和演进。

2014年11月,Rust官宣了cargo和crates.io,前者是Rust项目构建管理器,后者则是Rust官方维护的Rust代码的中央包存储库,通过cargo可以轻松构建和发布包到crates.io,或从crates.io上拉取Rust代码的依赖。

2015年5月15日,Rust迎来了一个里程碑的时刻:Rust 1.0正式发布! ,这要比Go发布1.0版本迟了3年。但正如官博所说:“1.0版本的发布标志着混乱的结束。此版本是我们对稳定性承诺的正式开始,因此它为构建应用程序和库提供了坚实的基础。从现在开始,重大更改基本上超出了范围(一些小的警告适用,例如编译器错误)”。

Rust 1.0发布后,Rust的版本发布周期与节奏也得以确定下来,即每6周发布一个稳定版本,按照这个节奏,与Rust 1.0同时发布的还有Rust 1.1 Beta版本。经过六周的测试后,Rust 1.1 Beta转为Rust 1.1稳定版本,同时发布Rust 1.2 Beta版本,依次类推。当然,Rust还有一个nightly build版本,这个版本包含了最新但不稳定的特性。和Go社区和开发人员每年只能high两次相比,Rust开发者和社区更加幸运,每六周就能high一次!

Rust的演进是基于RFC(Request For Comments)驱动的,并且这一措施是早于Rust 1.0发布前就基于RFC确定下来的。这与Go的Proposal process类似,但感觉比Go的流程更规范和严谨,当然这与两种语言的治理结构的组成和规则有关。

然而,Rust 1.0的发布只是Rust语言发展的一个新起点,这件事并没有像Go语言在2009年宣布开源那样获得足够的曝光度并赢得TIOBE年度最佳编程语言的称号。

Rust之后的发展依旧是一波三折,这主要也是缘于Rust当时没有一个“好爹”:


TIOBE Rust曲线(2012~2024.3)

2020年,Rust语言迎来了自己诞生以来的至暗时刻。因新冠疫情全球流行导致的业绩下滑,2020年8月,Mozilla解雇了全球1000名员工中的250名,这其中就包含Servo引擎背后的团队。该事件引起了人们对Rust未来的担忧,因为团队的一些成员是Rust的主要贡献者。

但塞翁失马焉知非福,2021年2月8日,由五家创始公司(AWS、华为、谷歌、微软和Mozilla)共同赞助的独立非营利组织Rust基金会宣布成立!Rust团队终于有了新家,并且这次除了亲爹Mozilla外,还有四个财大气粗、执IT牛耳的干爹,Rust语言的未来一下变得光明了。

实际上Rust的发展也是如此,从2021年基金会成立至今(2024.4),Rust取得了长足的发展:语言特性不断增强,编译器性能持续优化,生态系统日渐壮大和完善,增加和完善了对WebAssembly、嵌入式、大数据、区块链、人工智能等领域的支持。下面我们就来说说Rust语言的现状。

1.1.2 Rust的现状发展

1.1.2.1 语言排名

虽然Rust热度很高,但在语言排名方面与几乎同期的Go还有一定差距,在2024.3月的TIOBE排名中,Go稳居第8位,而Rust虽然刷新了自己的历史最高排名,但也仅仅排在第17位:


TIOBE Rust 2024.3排名

Redmonk 2024.1月排名中,Rust位列19位,Go位列12位:


Redmonk Rust 2024.1排名

不过,Rust的热度和社区活跃度甚至要高于Go,究其根源,我认为还是与两个开源语言的治理结构有关,下面是Go和Rust在Reddit论坛上的拥趸数量与在线人数对比(2024.4.6 21:39北京时间):

如果能持续保持住这样的热度和发展势头,Rust可能在未来几年迅速接近Go的位置,甚至超越也是有很大可能的。

和Go开发人员自称Gopher类似,Rust开发人员自称Rustacean,这是一个结合了“Rust”和“Crustacean”(甲壳类)两个词语的组合词。此外,Rust社区还设计了Rust的非官方吉祥物(mascot):Ferris,一只可爱的红色螃蟹,它是由设计师Karen Rustad Tölva在2010年创作的。Ferris象征着Rust语言的安全性、并发性和生产力,同时也代表着Rust社区的活跃和友好。

crates.io上还有一个名为ferris-says的crate,可以用来打印Ferris吉祥物相关的文字,可以输出像下面这样的ASCII字符拼接出的Ferris形象:

1.1.2.2 语言采纳

从上面TIOBE的Rust排名曲线来看,Rust在2018 edition和2021 edition前后到达过两个“尖峰”。各大公司以及初创也基本都是在2018 edition之后开始逐渐采纳Rust的。

注:关于Rust edition,感兴趣的读者可以先参考Rust官方文档,在后续学习cargo和Rust项目编译构建的时候,我们还会深入学习和理解edition。

接下来,我们列举一下Rust基金会创始公司以及其他一些知名IT公司和组织对Rust的采纳情况。

  • AWS

除了成为Rust基金会创始成员,让大家真正知道了AWS对Rust投入的决心外,真正让大家看到AWS内部大量使用Rust的文章是2022年2月AWS在官博发表的一篇名为Sustainability with Rust的文章,这篇文章介绍了Rust在AWS内部基础设施构建上发挥的关键作用,包括用Rust进行Firecracker、AWS Lambda、Amazon S3、 Bottlerocket等开发。这篇文章还引用了一篇2017年发表的论文Energy Efficiency across Programming Languages中的结论,认为Rust在能耗方面的优势是其他语言如Go、Java不能匹敌的,这一定程度上引发了争议,记得Russ Cox在Twitter上海批驳了这篇文章中引用的数据不准确。

  • 华为

作为国内以一己之力力抗美帝的通信、IT、手机、汽车等多赛道公司,同样也是拥有处理器、OS、编译器等全技术栈的研发型公司,华为对Rust这一的系统级编程语言尤为青睐。但从公开资料上能看到的东西不多,从华为可信编程实验室的主页上,我们看到了Rust在华为应用的一些情况。

华为的目标是在全球最大的电信行业设计值得信赖的软件系统。华为正在努力将部分代码库迁移到Rust,它比C/C++更安全且性能更高。为了帮助开发人员完成这个过程,华为利用开源C2Rust翻译器直接从C生成Rust代码。

huawei还在内部用Rust开发了一组丰富的内部库,这些库围绕基于actor的并发范式而构建,这样利用Rust语言功能(例如async、await等)简化了异步编程。

  • Google

Google已将Rust应用到Chromium、Android和FuchsiaOS中,其中Chromium对Rust的支持处于实验阶段。开发者可以使用Rust来开发适用于Android和FuchsiaOS的组件,并且Rust在Android和FuchsiaOS的内部代码中使用的比例相当大,特别是FuchsiaOS,Rust代码已经超过50%。由于内部Cpp代码量较大,2022年10月,谷歌推出了基于开源RISC-V芯片的新型安全操作系统KataOS。Sparrow是KataOS的参考实现,运行在seL4上,几乎完全用Rust编写。该操作系统不是为台式电脑或智能手机设计的,而是为物联网设计的,可用于智能家居设备。目标是为嵌入式硬件或边缘设备构建可验证的安全操作系统,例如捕获图像的网络连接摄像头,这些图像在设备上或云中处理以进行机器学习。在2022年发布的Android 13版本中,谷歌还宣布Android版本13中大约21%的新原生代码(C/C++/Rust)是Rust。AOSP拥有约150万行Rust代码,涵盖了新功能和组件。此外,Android的Rust代码中已发现零内存安全漏洞。为了实现提高Android内部安全性、稳定性和质量的目标,Android团队还表示,Rust应该用在代码库中需要原生代码的任何地方。

  • 微软

Microsoft拥有世界上最大的用C/C++编写的代码集合之一,其所有核心产品(例如Windows、Office和Azure云)均使用该代码。2019年,微软开始探索内存安全的编程语言,并试用了Rust。随后,Rust for Windows Library在GitHub上开源,使Rust开发人员能够顺利使用Windows API。

2022年,微软Azure CTO Mark Russinovich表示,新项目不应再使用C和C++。他建议,Rust应该用于需要非GC语言的项目,以提高安全性和可靠性。

2023年7月,微软宣布在Windows 11 Insider Preview Build 25905版本中发布了Rust参与编写的Windows内核模块。其中包含了一个 GDI 引擎的实现。

  • Meta(前身为Facebook)

虽然不是创始成员,但财大气粗的Meta目前已经是唯一非创始成员的铂金赞助商了。Meta历史上以C++为主,但从2021年开始,Rust便开始大量使用Rust了,并成为Meta支持的服务器端语言列表中的最新成员。

Meta在2021和2022年先后发表过A brief history of Rust at FacebookProgramming languages endorsed for server-side use at Meta详细说明了Rust在Meta内部的应用,感兴趣的读者可以去看看。

  • Linux基金会

炒得沸沸扬扬的在Linux Kernel中支持Rust语言终于尘埃落定,Linux Kernel 6.1版本对Rust提供了支持。Rust同时进入Windows、Linux内核,这让Rust的江湖地位得到进一步提升。相信未来,Rust在两大操作系统内核中的代码比例会逐步提升的。

  • 其他一些公司对Rust的应用

2024年初,cloudflare公司开源了其内部替代nginx的Rust库pingora,作为业界一家提供互联网基础设施和网络服务的公司,其采用Rust的示范效应也是非常明显的。

influxdb的母公司influxdata在2023年发布了influxdb 3.0版本,该版本采用Rust全面重写。不光是influxdb,诸多新兴时序数据库都采用了Rust技术栈(+Arrow+Parquet+DataFusion),比如greptimedb、cnosdb、CeresDB等。

字节跳动内部服务大量使用Go,但这几年也有一些Rust爱好者在字节内部布道Rust,并开源了诸如Rust RPC框架volo、基于io-uring的Rust async runtime monoio等。

埃隆马斯克的xAI在2024年发布的grok-1大模型中,Rust开发的Qdrant向量数据库也发挥了重要作用,也是Rust在AI领域应用迈出的重要一步。

1.1.2.3 应用领域

在Rust官网,我们能看到官方列出的Rust应用的四大领域:

在这四个领域中,Rust都有非常活跃的发展和应用,每个领域都有大量的优秀开源项目,这里无法穷尽,大家可以参考与awesome-go类似的awesome-rust项目查看自己关于领域的开源项目。

1.1.2.4 工作机会与薪酬

devjobsscanner统计的2023年的各个编程语言的工作需求来看,Rust目前依旧比较小众!

stackoverflow 2023薪酬统计来看,Rust薪资位于中游:

另外4 day week的工作数量和薪酬分析也印证了上面两点:Rust小众(工作数量相对较少),薪酬位于中游:

国内Rust的工作数量与国际相同,都处于较少的位置,但国内Rust薪酬数据可能并不低,因为这些Rust岗位基本都在一线大厂,或是拿了较多融资的初创,待遇可能都比较不错。

了解了Rust的诞生和演化历史以及Rust的不错的现状后,我们再来看看Rust的设计哲学。

1.2 Rust的设计哲学

设计哲学之于编程语言,就好比一个人的价值观之于这个人的行为。因为如果你不认同一个人的价值观,那你其实很难与之持续交往下去,即所谓道不同不相为谋。类似的,如果你不认同一门编程语言的设计哲学,那么大概率你在后续的语言学习中,就会遇到上面提到的这些问题,而且可能会让你失去继续学习的精神动力。因此,在真正开始学习Rust语法和编码之前,我们还需要先来了解一下Rust的设计哲学,等了解完这些之后,你就能更深刻地认识到自己学习Rust的原因了。

1.2.1 Rust核心价值观

2019年6月,Rust核心组成员Stephen Klabnik在QCon London发表了一次名为How Rust Views Tradeoffs的演讲,在这次演讲中,他阐述了他个人理解的Rust的核心价值观,这些价值观是Rust team在做设计取舍时拒绝妥协的点,它们包括内存安全、执行速度和生产力:

按照Stephen Klabnik的说法,这三个核心价值观也是有序的,首先是内存安全,这是Rust最为在乎的立身之本,其次是高性能,最后是生产力。当它们之间出现冲突时,按最高价值观决策!

这其实与Rust官方对Rust的介绍也是一样的:

官方的Reliable对应的就是内存安全(memory safety),而efficient则有两层含义,一是运行时的高效,另外一个方面则是构建时的生产力也要保持高水准。

这三个价值观是Rust语言的设计目标,也是Rust语言的特色和优势所在。在失去了Graydon Hoare这个语言之父后,这些价值观也成为了Rust核心团队在判定语言演进方向的根本依据。

  1. 内存安全

内存安全是Rust最重要的价值观,它意味着Rust程序在运行时不会出现内存泄漏(不使用unsafe代码的前提下)、缓冲区溢出、野指针等内存相关的错误。这些错误不仅会导致程序崩溃,还可能导致安全漏洞的产生。Rust通过所有权(ownership)、生命周期(lifetime)和借用(borrowing)等特性,在编译时最大程度地检查出这些错误,从而保证程序的内存安全。

Rust的内存安全机制不仅能够提高程序的稳定性和可靠性,还能够降低开发和维护的难度。由于Rust能够在编译时就检查出内存错误,开发者就不必再花费大量时间和精力去寻找和修复这些错误了。

  1. 高性能

高性能是Rust的仅次于内存安全的一个核心价值观,Rust语言的设计目标之一就是要成为一种高性能的系统编程语言。Rust通过零成本抽象、移动语义、泛型编程等特性,使得程序能够在运行时达到与C、C++等传统系统编程语言相当的性能。

Rust的高性能机制不仅能够提高程序的运行速度,还能够降低硬件成本。由于Rust能够更好地利用硬件资源,因此在相同的硬件条件和资源开销下,Rust程序的性能通常比其他语言的程序更高。

  1. 生产力

生产力是Rust的第三个核心价值观,Rust语言的设计目标之一就是要成为一种能够提高开发者生产力的语言。Rust通过包管理器Cargo、智能编辑器支持、丰富的库生态、详实系统的文档等特性,使得开发者能够更轻松地编写、调试和维护Rust程序。

1.2.2 Rust的次要价值观

Stephen Klabnik还总结了三条Rust的次要价值观(secondary values):

我们看到:Rust的次要价值观包括ergonomics、compile times和correctness,这三个价值观也是Rust语言的设计目标之一,但和上面的第一级核心价值观相比,它们是可以被妥协掉的。

Ergonomics是指Rust语言的易用性,它是Rust语言的一个重要设计目标。Rust希望通过简单易用的语法和丰富的库生态,使得开发者能够更轻松地编写Rust程序。

Compile Times是指Rust编译器的编译时间。Rust编译器很慢,这是一个问题,Rust team也正在努力优化,但Rust team更关心二进制文件的最终执行速度,而不是让编译器变得更快,这就是Compile Time作为次要价值观的原因。

Correctness是指Rust语言的正确性,Rust真的很在乎你的程序是否正确,Rust希望通过强大的类型系统和静态检查,来尽可能地保证Rust程序的正确性。但Rust不愿意使用完全依赖类型以及证明助手来证明你的代码是正确的。

1.2.3 与Go的价值观的对比

我们来对比一下Go官方的对Go的介绍,看一下其隐含的Go价值观(设计哲学):

在官方对Go的介绍中有三个关键词:Simple、Secure和Scalable。

Simple是Go语言的首要设计原则,Go语言的设计者希望Go语言能够简单易用,使得开发者能够更快地学习和使用Go语言,以快速形成生产能力。Go语言的语法简单易懂,并且去掉了许多其他编程语言中复杂的特性,如类型层次与继承等,使得Go语言更加简洁易学、易读、易用和易维护。

至于Secure,Go语言的设计者希望Go语言能够更加安全可靠,避免许多其他编程语言中常见的安全漏洞。Go语言通过垃圾回收机制来自动管理内存,避免了许多其他编程语言中常见的内存泄漏和缓冲区溢出等问题。同时,Go语言提供了轻量级的goroutine和通道机制,使得开发者能够更加方便地实现并发编程,并且通过数据竞争检测工具,避免了并发编程中常见的数据竞争问题。同时Go语言提供了简单易用的显式错误处理机制,让开发者不遗漏任一处错误处理。

Scalable则体现在Go面向工程、原生内置并发以及崇尚组合的设计哲学上了。 Go语言的设计者希望Go语言能够更好地支持可扩展性,使得Go程序能够更好地适应不同的组织规模、不同的工作负载和硬件环境。Go语言通过简单的语法、基于module的可重现的构建管理、极高的编译速度、高质量的标准库、实用的工具链、强大的内置并发机制以及面向接口编程等特性,使得Go程序更加可扩展,生产力更为高效。

总的来说,Rust更注重安全、底层控制和极致性能,而Go则更加关注简单、安全、扩展性与工程效率。两者在定位和设计哲学上存在区别,但也有一些共同特点,比如都拥有现代的工具链、活跃的社区等。

1.3 本章小结

在这篇博文中,我们了解了Rust语言的诞生历程、现状发展,以及它独特的设计哲学。通过与Go语言进行对比,我们可以看出两者在出身、目标和设计理念上的一些差异。

随着软件系统的复杂度不断提高,对安全性、性能和并发的需求也越来越高。作为一门专注于底层系统编程、性能极致化的新语言,Rust正在吸引越来越多开发者的关注。相信通过后面对Rust的全方面的系统学习,我和大家都能够更深入地理解和掌握Rust。

如果你认为Rust的价值观与你的十分匹配,你也认同Rust未来的发展。那就期待下一篇吧,在下一篇中,我们将开始动手学习Rust了!

1.4 参考资料


Gopher部落知识星球在2024年将继续致力于打造一个高品质的Go语言学习和交流平台。我们将继续提供优质的Go技术文章首发和阅读体验。同时,我们也会加强代码质量和最佳实践的分享,包括如何编写简洁、可读、可测试的Go代码。此外,我们还会加强星友之间的交流和互动。欢迎大家踊跃提问,分享心得,讨论技术。我会在第一时间进行解答和交流。我衷心希望Gopher部落可以成为大家学习、进步、交流的港湾。让我相聚在Gopher部落,享受coding的快乐! 欢迎大家踊跃加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) – https://gopherdaily.tonybai.com

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • Gopher Daily归档 – https://github.com/bigwhite/gopherdaily

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

依赖Kafka的Go单元测试例解

本文永久链接 – https://tonybai.com/2024/01/08/go-unit-testing-deps-on-kafka

Kafka是Apache基金会开源的一个分布式事件流处理平台,是Java阵营(最初为Scala)中的一款杀手级应用,其提供的高可靠性、高吞吐量和低延迟的数据传输能力,让其到目前为止依旧是现代企业级应用系统以及云原生应用系统中使用的重要中间件。

在日常开发Go程序时,我们经常会遇到一些依赖Kafka的代码,如何对这些代码进行测试,尤其是单测是摆在Go开发者前面的一个现实问题!

有人说用mock,是个路子。但看过我的《单测时尽量用fake object》一文的童鞋估计已经走在了寻找kafka fake object的路上了!Kafka虽好,但身形硕大,不那么灵巧。找到一个合适的fake object不容易。在这篇文章中,我们就来聊聊如何测试那些依赖kafka的代码,再往本质一点说,就是和大家以找找那些合适的kafka fake object。

1. 寻找fake object的策略

在《单测时尽量用fake object》一文中,我们提到过,如果测试的依赖提供了tiny版本或某些简化版,我们可以直接使用这些版本作为fake object的候选,就像etcd提供了用于测试的自身简化版的实现(embed)那样。

但Kafka并没有提供tiny版本,我们也只能选择《单测时尽量用fake object》一文提到的另外一个策略,那就是利用容器来充当fake object,这是目前能搞到任意依赖的fake object的最简单路径了。也许以后WASI(WebAssembly System Interface)成熟了,让wasm脱离浏览器并可以在本地系统上飞起,到时候换用wasm也不迟。

下面我们就按照使用容器的策略来找一找适合的kafka container。

2. testcontainers-go

我们第一站就来到了testcontainers-go。testcontainers-go是一个Go语言开源项目,专门用于简化创建和清理基于容器的依赖项,常用于Go项目的单元测试、自动化集成或冒烟测试中。通过testcontainers-go提供的易于使用的API,开发人员能够以编程方式定义作为测试的一部分而运行的容器,并在测试完成时清理这些资源。

注:testcontainers不仅提供Go API,它还覆盖了主流的编程语言,包括:Java、.NET、Python、Node.js、Rust等。

在几个月之前,testcontainers-go项目还没有提供对Kafka的直接支持,我们需要自己使用testcontainers.GenericContainer来自定义并启动kafka容器。2023年9月,以KRaft模式运行的Kafka容器才被首次引入testcontainers-go项目

目前testcontainers-go使用的kafka镜像版本是confluentinc/confluent-local:7.5.0Confluent是在kafka背后的那家公司,基于kafka提供商业化支持。今年初,Confluent还收购了Immerok,将apache的另外一个明星项目Flink招致麾下。

confluent-local并不是一个流行的kafka镜像,它只是一个使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且镜像是实验性的,仅应用于本地开发工作流,不应该用在支持生产工作负载。

生产中最常用的开源kafka镜像是confluentinc/cp-kafka镜像,它是基于开源Kafka项目构建的,但在此基础上添加了一些额外的功能和工具,以提供更丰富的功能和更易于部署和管理的体验。cp-kafka镜像的版本号并非kafka的版本号,其对应关系需要cp-kafka镜像官网查询。

另外一个开发领域常用的kafka镜像是bitnami的kafka镜像。Bitnami是一个提供各种开源软件的预打包镜像和应用程序栈的公司。Bitnami Kafka镜像是基于开源Kafka项目构建的,是一个可用于快速部署和运行Kafka的Docker镜像。Bitnami Kafka镜像与其内部的Kakfa的版本号保持一致。

下面我们就来看看如何使用testcontainers-go的kafka来作为依赖kafka的Go单元测试用例的fake object。

这第一个测试示例改编自testcontainers-go/kafka module的example_test.go:

// testcontainers/kafka_setup/kafka_test.go

package main

import (
    "context"
    "fmt"
    "testing"

    "github.com/testcontainers/testcontainers-go/modules/kafka"
)

func TestKafkaSetup(t *testing.T) {
    ctx := context.Background()

    kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))
    if err != nil {
        panic(err)
    }

    // Clean up the container
    defer func() {
        if err := kafkaContainer.Terminate(ctx); err != nil {
            panic(err)
        }
    }()

    state, err := kafkaContainer.State(ctx)
    if err != nil {
        panic(err)
    }

    if kafkaContainer.ClusterID != "test-cluster" {
        t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID)
    }
    if state.Running != true {
        t.Errorf("want true, actual %t", state.Running)
    }
    brokers, _ := kafkaContainer.Brokers(ctx)
    fmt.Printf("%q\n", brokers)
}

在这个例子中,我们直接调用kafka.RunContainer创建了一个名为test-cluster的kafka实例,如果没有通过WithImage向RunContainer传入自定义镜像,那么默认我们将启动一个confluentinc/confluent-local:7.5.0的容器(注意:随着时间变化,该默认容器镜像的版本也会随之改变)。

通过RunContainer返回的kafka.KafkaContainer我们可以获取到关于kafka容器的各种信息,比如上述代码中的ClusterID、kafka Broker地址信息等。有了这些信息,我们后续便可以与以容器形式启动的kafka建立连接并做数据的写入和读取操作了。

我们先来看这个测试的运行结果,与预期一致:

$ go test
2023/12/16 21:45:52 github.com/testcontainers/testcontainers-go - Connected to docker:
  ... ...
  Resolved Docker Host: unix:///var/run/docker.sock
  Resolved Docker Socket Path: /var/run/docker.sock
  Test SessionID: 19e47867b733f4da4f430d78961771ae3a1cc66c5deca083b4f6359c6d4b2468
  Test ProcessID: 41b9ef62-2617-4189-b23a-1bfa4c06dfec
2023/12/16 21:45:52 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/16 21:45:53 Container created: 8f2240042c27
2023/12/16 21:45:53 Starting container: 8f2240042c27
2023/12/16 21:45:53 Container started: 8f2240042c27
2023/12/16 21:45:53 Waiting for container id 8f2240042c27 image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/16 21:45:53 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/16 21:45:53 Container created: a39a495aed0b
2023/12/16 21:45:53 Starting container: a39a495aed0b
2023/12/16 21:45:53 Container started: a39a495aed0b
["localhost:1037"]
2023/12/16 21:45:58 Terminating container: a39a495aed0b
2023/12/16 21:45:58 Container terminated: a39a495aed0b
PASS
ok      demo    6.236s

接下来,在上面用例的基础上,我们再来做一个Kafka连接以及数据读写测试:

// testcontainers/kafka_consumer_and_producer/kafka_test.go

package main

import (
    "bytes"
    "context"
    "errors"
    "net"
    "strconv"
    "testing"
    "time"

    "github.com/testcontainers/testcontainers-go/modules/kafka"

    kc "github.com/segmentio/kafka-go" // kafka client
)

func createTopics(brokers []string, topics ...string) error {
    // to create topics when auto.create.topics.enable='false'
    conn, err := kc.Dial("tcp", brokers[0])
    if err != nil {
        return err
    }
    defer conn.Close()

    controller, err := conn.Controller()
    if err != nil {
        return err
    }
    var controllerConn *kc.Conn
    controllerConn, err = kc.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        return err
    }
    defer controllerConn.Close()

    var topicConfigs []kc.TopicConfig
    for _, topic := range topics {
        topicConfig := kc.TopicConfig{
            Topic:             topic,
            NumPartitions:     1,
            ReplicationFactor: 1,
        }
        topicConfigs = append(topicConfigs, topicConfig)
    }

    err = controllerConn.CreateTopics(topicConfigs...)
    if err != nil {
        return err
    }

    return nil
}

func newWriter(brokers []string, topic string) *kc.Writer {
    return &kc.Writer{
        Addr:                   kc.TCP(brokers...),
        Topic:                  topic,
        Balancer:               &kc.LeastBytes{},
        AllowAutoTopicCreation: true,
        RequiredAcks:           0,
    }
}

func newReader(brokers []string, topic string) *kc.Reader {
    return kc.NewReader(kc.ReaderConfig{
        Brokers:  brokers,
        Topic:    topic,
        GroupID:  "test-group",
        MaxBytes: 10e6, // 10MB
    })
}

func TestProducerAndConsumer(t *testing.T) {
    ctx := context.Background()

    kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    // Clean up the container
    defer func() {
        if err := kafkaContainer.Terminate(ctx); err != nil {
            t.Fatalf("want nil, actual %v\n", err)
        }
    }()

    state, err := kafkaContainer.State(ctx)
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    if state.Running != true {
        t.Errorf("want true, actual %t", state.Running)
    }

    brokers, err := kafkaContainer.Brokers(ctx)
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    topic := "test-topic"
    w := newWriter(brokers, topic)
    defer w.Close()
    r := newReader(brokers, topic)
    defer r.Close()

    err = createTopics(brokers, topic)
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }
    time.Sleep(5 * time.Second)

    messages := []kc.Message{
        {
            Key:   []byte("Key-A"),
            Value: []byte("Value-A"),
        },
        {
            Key:   []byte("Key-B"),
            Value: []byte("Value-B"),
        },
        {
            Key:   []byte("Key-C"),
            Value: []byte("Value-C"),
        },
        {
            Key:   []byte("Key-D"),
            Value: []byte("Value-D!"),
        },
    }

    const retries = 3
    for i := 0; i < retries; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        // attempt to create topic prior to publishing the message
        err = w.WriteMessages(ctx, messages...)
        if errors.Is(err, kc.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
            time.Sleep(time.Millisecond * 250)
            continue
        }

        if err != nil {
            t.Fatalf("want nil, actual %v\n", err)
        }
        break
    }

    var getMessages []kc.Message
    for i := 0; i < len(messages); i++ {
        m, err := r.ReadMessage(context.Background())
        if err != nil {
            t.Fatalf("want nil, actual %v\n", err)
        }
        getMessages = append(getMessages, m)
    }

    for i := 0; i < len(messages); i++ {
        if !bytes.Equal(getMessages[i].Key, messages[i].Key) {
            t.Errorf("want %s, actual %s\n", string(messages[i].Key), string(getMessages[i].Key))
        }
        if !bytes.Equal(getMessages[i].Value, messages[i].Value) {
            t.Errorf("want %s, actual %s\n", string(messages[i].Value), string(getMessages[i].Value))
        }
    }
}

我们使用segmentio/kafka-go这个客户端来实现kafka的读写。关于如何使用segmentio/kafka-go这个客户端,可以参考我之前写的《Go社区主流Kafka客户端简要对比》。

这里我们在TestProducerAndConsumer这个用例中,先通过testcontainers-go的kafka.RunContainer启动一个Kakfa实例,然后创建了一个topic: “test-topic”。我们在写入消息前也可以不单独创建这个“test-topic”,Kafka默认启用topic自动创建,并且segmentio/kafka-go的高级API:Writer也支持AllowAutoTopicCreation的设置。不过topic的创建需要一些时间,如果要在首次写入消息时创建topic,此次写入可能会失败,需要retry。

向topic写入一条消息(实际上是一个批量Message,包括四个key-value pair)后,我们调用ReadMessage从上述topic中读取消息,并将读取的消息与写入的消息做比较。

注:近期发现kafka-go的一个可能导致内存暴涨的问题,在kafka ack返回延迟变大的时候,可能触发该问题。

下面是执行该用例的输出结果:

$ go test
2023/12/17 17:43:54 github.com/testcontainers/testcontainers-go - Connected to docker:
  Server Version: 24.0.7
  API Version: 1.43
  Operating System: CentOS Linux 7 (Core)
  Total Memory: 30984 MB
  Resolved Docker Host: unix:///var/run/docker.sock
  Resolved Docker Socket Path: /var/run/docker.sock
  Test SessionID: f76fe611c753aa4ef1456285503b0935a29795e7c0fab2ea2588029929215a08
  Test ProcessID: 27f531ee-9b5f-4e4f-b5f0-468143871004
2023/12/17 17:43:54 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/17 17:43:54 Container created: 577309098f4c
2023/12/17 17:43:54 Starting container: 577309098f4c
2023/12/17 17:43:54 Container started: 577309098f4c
2023/12/17 17:43:54 Waiting for container id 577309098f4c image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/17 17:43:54 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/17 17:43:55 Container created: 1ee11e11742b
2023/12/17 17:43:55 Starting container: 1ee11e11742b
2023/12/17 17:43:55 Container started: 1ee11e11742b
2023/12/17 17:44:15 Terminating container: 1ee11e11742b
2023/12/17 17:44:15 Container terminated: 1ee11e11742b
PASS
ok      demo    21.505s

我们看到默认情况下,testcontainer能满足与kafka交互的基本需求,并且testcontainer提供了一系列Option(WithXXX)可以对container进行定制,以满足一些扩展性的要求,但是这需要你对testcontainer提供的API有更全面的了解。

除了开箱即用的testcontainer之外,我们还可以使用另外一种方便的基于容器的技术:docker-compose来定制和启停我们需要的kafka image。接下来,我们就来看看如何使用docker-compose建立fake kafka object。

3. 使用docker-compose建立fake kafka

3.1 一个基础的基于docker-compose的fake kafka实例模板

这次我们使用bitnami提供的kafka镜像,我们先建立一个“等价”于上面“testcontainers-go”提供的kafka module的kafka实例,下面是docker-compose.yml:

// docker-compose/bitnami/plaintext/docker-compose.yml

version: "2"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    network_mode: "host"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # borrow from testcontainer
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0
      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
volumes:
  kafka_data:
    driver: local

我们看到其中一些配置“借鉴”了testcontainers-go的kafka module,我们启动一下该容器:

$ docker-compose up -d
[+] Running 2/2
 ✔ Volume "plaintext_kafka_data"  Created                                                                                    0.0s
 ✔ Container plaintext-kafka-1    Started                                                                                    0.1s

依赖该容器的go测试代码与前面的TestProducerAndConsumer差不多,只是在开始处去掉了container的创建过程:

// docker-compose/bitnami/plaintext/kafka_test.go

func TestProducerAndConsumer(t *testing.T) {
    brokers := []string{"localhost:9092"}
    topic := "test-topic"
    w := newWriter(brokers, topic)
    defer w.Close()
    r := newReader(brokers, topic)
    defer r.Close()

    err := createTopics(brokers, topic)
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }
    time.Sleep(5 * time.Second)
    ... ...
}

运行该测试用例,我们看到预期的结果:

go test
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
PASS
ok      demo    15.143s

不过对于单元测试来说,显然我们不能手动来启动和停止kafka container,我们需要为每个用例填上setup和teardown,这样也能保证用例间的相互隔离,于是我们增加了一个docker_compose_helper.go文件,在这个文件中我们提供了一些帮助testcase启停kafka的helper函数:

// docker-compose/bitnami/plaintext/docker_compose_helper.go

package main

import (
    "fmt"
    "os/exec"
    "strings"
    "time"
)

// helpler function for operating docker container through docker-compose command

const (
    defaultCmd     = "docker-compose"
    defaultCfgFile = "docker-compose.yml"
)

func execCliCommand(cmd string, opts ...string) ([]byte, error) {
    cmds := cmd + " " + strings.Join(opts, " ")
    fmt.Println("exec command:", cmds)
    return exec.Command(cmd, opts...).CombinedOutput()
}

func execDockerComposeCommand(cmd string, cfgFile string, opts ...string) ([]byte, error) {
    var allOpts = []string{"-f", cfgFile}
    allOpts = append(allOpts, opts...)
    return execCliCommand(cmd, allOpts...)
}

func UpKakfa(composeCfgFile string) ([]byte, error) {
    b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "up", "-d")
    if err != nil {
        return nil, err
    }
    time.Sleep(10 * time.Second)
    return b, nil
}

func UpDefaultKakfa() ([]byte, error) {
    return UpKakfa(defaultCfgFile)
}

func DownKakfa(composeCfgFile string) ([]byte, error) {
    b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "down", "-v")
    if err != nil {
        return nil, err
    }
    time.Sleep(10 * time.Second)
    return b, nil
}

func DownDefaultKakfa() ([]byte, error) {
    return DownKakfa(defaultCfgFile)
}

眼尖的童鞋可能看到:在UpKakfa和DownKafka函数中我们使用了硬编码的“time.Sleep”来等待10s,通常在镜像已经pull到本地后这是有效的,但却不是最精确地等待方式,testcontainers-go/wait中提供了等待容器内程序启动完毕的多种策略,如果你想用更精确的等待方式,可以了解一下wait包。

基于helper函数,我们改造一下TestProducerAndConsumer用例:

// docker-compose/bitnami/plaintext/kafka_test.go
func TestProducerAndConsumer(t *testing.T) {
    _, err := UpDefaultKakfa()
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    t.Cleanup(func() {
        DownDefaultKakfa()
    })
    ... ...
}

我们在用例开始处通过UpDefaultKakfa使用docker-compose将kafka实例启动起来,然后注册了Cleanup函数,用于在test case执行结束后销毁kafka实例。

下面是新版用例的执行结果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok      demo    36.402s

使用docker-compose的最大好处就是可以通过docker-compose.yml文件对要fake的object进行灵活的定制,这种定制与testcontainers-go的差别就是你无需去研究testcontiners-go的API。

下面是使用tls连接与kafka建立连接并实现读写的示例。

3.2 建立一个基于TLS连接的fake kafka实例

Kafka的配置复杂是有目共睹的,为了建立一个基于TLS连接,我也是花了不少时间做“试验”,尤其是listeners以及证书的配置,不下点苦功夫读文档还真是配不出来。

下面是一个基于bitnami/kafka镜像配置出来的基于TLS安全通道上的kafka实例:

// docker-compose/bitnami/tls/docker-compose.yml

# config doc:  https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md

version: "2"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.6
    network_mode: "host"
    #ports:
      #- "9092:9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,SECURED://:9093,CONTROLLER://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=SECURED://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SECURED:SSL,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURED
      # SSL settings
      - KAFKA_TLS_TYPE=PEM
      - KAFKA_TLS_CLIENT_AUTH=none
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      # borrow from testcontainer
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0
      - KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
    volumes:
      # server.cert, server.key and ca.crt
      - "kafka_data:/bitnami"
      - "./kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro"
      - "./kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro"
      - "./kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro"
volumes:
  kafka_data:
    driver: local

这里我们使用pem格式的证书和key,在上面配置中,volumes下面挂载的kafka.keystore.pem、kafka.keystore.key和kafka.truststore.pem分别对应了以前在Go中常用的名字:server-cert.pem(服务端证书), server-key.pem(服务端私钥)和ca-cert.pem(CA证书)。

这里整理了一个一键生成的脚本docker-compose/bitnami/tls/kafka-generate-cert.sh,我们执行该脚本生成所有需要的证书并放到指定位置(遇到命令行提示,只需要一路回车即可):

$bash kafka-generate-cert.sh
.........++++++
.............................++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting Private key
.....................++++++
.........++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting CA Private Key

接下来,我们来改造用例,使之支持以tls方式建立到kakfa的连接:

//docker-compose/bitnami/tls/kafka_test.go

func createTopics(brokers []string, tlsConfig *tls.Config, topics ...string) error {
    dialer := &kc.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsConfig,
    }

    conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
    if err != nil {
        fmt.Println("creating topic: dialer dial error:", err)
        return err
    }
    defer conn.Close()
    fmt.Println("creating topic: dialer dial ok")
    ... ...
}

func newWriter(brokers []string, tlsConfig *tls.Config, topic string) *kc.Writer {
    w := &kc.Writer{
        Addr:                   kc.TCP(brokers...),
        Topic:                  topic,
        Balancer:               &kc.LeastBytes{},
        AllowAutoTopicCreation: true,
        Async:                  true,
        //RequiredAcks:           0,
        Completion: func(messages []kc.Message, err error) {
            for _, message := range messages {
                if err != nil {
                    fmt.Println("write message fail", err)
                } else {
                    fmt.Println("write message ok", string(message.Topic), string(message.Value))
                }
            }
        },
    }

    if tlsConfig != nil {
        w.Transport = &kc.Transport{
            TLS: tlsConfig,
        }
    }
    return w
}

func newReader(brokers []string, tlsConfig *tls.Config, topic string) *kc.Reader {
    dialer := &kc.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsConfig,
    }

    return kc.NewReader(kc.ReaderConfig{
        Dialer:   dialer,
        Brokers:  brokers,
        Topic:    topic,
        GroupID:  "test-group",
        MaxBytes: 10e6, // 10MB
    })
}

func TestProducerAndConsumer(t *testing.T) {
    var err error
    _, err = UpDefaultKakfa()
    if err != nil {
        t.Fatalf("want nil, actual %v\n", err)
    }

    t.Cleanup(func() {
        DownDefaultKakfa()
    })

    brokers := []string{"localhost:9093"}
    topic := "test-topic"

    tlsConfig, _ := newTLSConfig()
    w := newWriter(brokers, tlsConfig, topic)
    defer w.Close()
    r := newReader(brokers, tlsConfig, topic)
    defer r.Close()
    err = createTopics(brokers, tlsConfig, topic)
    if err != nil {
        fmt.Printf("create topic error: %v, but it may not affect the later action, just ignore it\n", err)
    }
    time.Sleep(5 * time.Second)
    ... ...
}

func newTLSConfig() (*tls.Config, error) {
    /*
       // 加载 CA 证书
       caCert, err := ioutil.ReadFile("/path/to/ca.crt")
       if err != nil {
               return nil, err
       }

       // 加载客户端证书和私钥
       cert, err := tls.LoadX509KeyPair("/path/to/client.crt", "/path/to/client.key")
       if err != nil {
               return nil, err
       }

       // 创建 CertPool 并添加 CA 证书
       caCertPool := x509.NewCertPool()
       caCertPool.AppendCertsFromPEM(caCert)
    */
    // 创建并返回 TLS 配置
    return &tls.Config{
        //RootCAs:      caCertPool,
        //Certificates: []tls.Certificate{cert},
        InsecureSkipVerify: true,
    }, nil
}

在上述代码中,我们按照segmentio/kafka-go为createTopics、newWriter和newReader都加上了tls.Config参数,此外在测试用例中,我们用newTLSConfig创建一个tls.Config的实例,在这里我们一切简化处理,采用InsecureSkipVerify=true的方式与kafka broker服务端进行握手,既不验证服务端证书,也不做双向认证(mutual TLS)。

下面是修改代码后的测试用例执行结果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
creating topic: dialer dial ok
creating topic: get controller ok
creating topic: dial control listener ok
create topic error: EOF, but it may not affect the later action, just ignore it
write message error: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok      demo    38.473s

这里我们看到:createTopics虽然连接kafka的各个listener都ok,但调用topic创建时,返回EOF,但这的确不影响后续action的执行,不确定这是segmentio/kafka-go的问题,还是kafka实例的问题。另外首次写入消息时,也因为topic或partition未建立而失败,retry后消息正常写入。

通过这个例子我们看到,基于docker-compose建立fake object有着更广泛的灵活性,如果做好容器启动和停止的精准wait机制的话,我可能会更多选择这种方式。

4. 小结

本文介绍了如何在Go编程中进行依赖Kafka的单元测试,并探讨了寻找适合的Kafka fake object的策略。

对于Kafka这样的复杂系统来说,找到合适的fake object并不容易。因此,本文推荐使用容器作为fake object的策略,并分别介绍了使用testcontainers-go项目和使用docker-compose作为简化创建和清理基于容器的依赖项的工具。相对于刚刚加入testcontainers-go项目没多久的kafka module而言,使用docker-compose自定义fake object更加灵活一些。但无论哪种方法,开发人员都需要对kafka的配置有一个较为整体和深入的理解。

文中主要聚焦使用testcontainers-go和docker-compose建立fake kafka的过程,而用例并没有建立明确的sut(被测目标),比如针对某个函数的白盒单元测试。

文本涉及的源码可以在这里下载。


“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2024年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) – https://gopherdaily.tonybai.com

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • Gopher Daily归档 – https://github.com/bigwhite/gopherdaily

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言进阶课 Go语言精进之路1 Go语言精进之路2 Go语言第一课 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats