标签 Arrow 下的文章

2023年Go语言盘点:稳中求新,稳中求变

本文永久链接 – https://tonybai.com/2023/12/31/the-2023-review-of-go-programming-language

时光荏苒,转眼间已经是2023年的最后一天了。《2022年Go语言盘点:泛型落地,无趣很好,稳定为王》仿佛就写在昨天。

回首这一年,全球彻底从新冠大流行中得以复苏,Go语言也不例外,最直观的表现就是全球各地的GopherCon技术大会或小型Meetup都纷纷从停办/线上的状态来到了线下,并获得Gopher们的热烈欢迎和踊跃参与,比如下图中的GopherConGopherCon UKGopherCon EuropeGopherCon AustraliaGolab等。

尤其值得一提的是我们本土最大的Gopher技术大会GopherChina 2023,今年为了满足不同地域Gopher的需求,GoCN社区在6月和11月分别在北京和上海举办了两次GopherChina大会,这也是历史首次。

Go语言团队的大神们也开始重新“乐此不疲”地参与到上述这些大会中,以推进全球Go社区与生态的建设。就连已经退居二线的Go语言之父Rob Pike也亲自“现身说法”,在年底的GopherCon Australia 2023上发表了“What We Got Right, What We Got Wrong”的主题演讲来回顾Go诞生以来的得与失。

大神回顾一生,我们盘点一年。在这篇文章中,我就和大家一起聊聊Go在2023年的状态、所处的位置以及Go未来演进的机制与策略。

1. Go的2023

1.1 稳

一如往年,Go在2023年发布了两个大版本,分别是2023年2月份的Go 1.20和8月份的Go 1.21

在这两个版本中,Go语法特性一如既往的求稳,除了支持切片类型到数组类型(或数组类型的指针)的类型转换,其余更是像语法的修修补补,比如:comparable“放宽”了对泛型实参的限制、unsafe包继续添加“语法糖”、增加min、max和clear预定义函数、增强type inference能力等。

这些并不会让Gopher感到“意外”,因为这与Russ Cox在2022年宣称的“Go is boring”的精神是一脉相承的。

不过,除了Go语法特性变化方面的“寡淡”之外,Go在其他方面还是求新和求变的,接下来我们先来看看Go是如何求新的。

注:求新与求变可能存在交集的地方,边界可能也有一定模糊性,也存在相互促进的情况,希望大家阅读下面内容时不要吹毛求疵:)。

1.2 求新

Go在语法特性求稳的同时,在编译器、工具链、运行时以及标准库等方面都在努力优化和打磨,旨在进一步提升Go兼具的生产力与运行时效率,其中很多优化和打磨的措施不乏新颖。

Go 1.20版本中首次引入的PGO(profile-guided optimization)技术预览版,到Go 1.21版本变为默认开启,Go官方给出的PGO优化的效果数据是:PGO优化带来的性能提升一般是2%~7%,而在最新的Go 1.22rc1中,这个数字已经变为2%~14%了。

在内存管理方面,Go 1.20引入了试验特性arena包,虽然它没能在Go 1.21中按时转正,如今处于proposal-hold状态,但这也算是一次在内存管理机制上的求新。

Go是一门面向软件工程的编程语言,在这一年中,Go在软件工程领域的求新例子也是不少。比如:可用于大幅简化Go项目创建的gonew工具,它支持基于go project template clone并创建一个属于你的Go项目;再比如对应用执行时的代码覆盖率的采集,可以帮助开发者更进一步了解最终可执行程序代码执行路径上的测试覆盖情况;而govulncheck工具则是Go在软件工程与供应链安全领域的求新尝试,该工具丰富了我们对Go项目进行安全漏洞检查的手段。

注:关于供应链安全问题,Russ Cox近期有一个专门的Talk:Open Source Supply Chain Security at Google,感兴趣的童鞋可以学习一下。

Go始终对IT界出现的新技术、新趋势以及Go社区的新想法保持open。在WASM出现早期,Go就提供了对wasm的porting支持,如今在Go 1.21中,Go还对尚未形成最终规范的WASI(WebAssembly System Interface)提供了支持。

Go社区的反馈也是Go团队求新的来源,比如一个典型例子就是log/slog加入标准库,让Go标准库原生支持了结构化日志输出,且日志性能不输像zap这样的第三方开源log包

Go社区也跟随Go团队的节奏,走在求新的道路上。2023年,IT界最大的事件就是以ChatGPT为代表的大语言模型的横空出世,这很可能是一个百年不遇的、对人类文明进步有着重要里程碑意义的事件。各行各业,言必称大模型,言必称AI。在传统机器学习、深度学习以及神经网络方面生态并不丰富的Go,也在尝试与大模型对接,比如:支持快速在本地启动和运行llama2、mistral 7B、codellama、vicuna等大模型的ollama开源项目在短短几个月就收获近30k个小星星的关注;再比如Gemini大模型推出后,Google一并开源了支持与Google各种大模型项目对接的Google AI Go SDK开源项目,并提供了详细的教程指导Gopher如何通过该SDK与大模型交互

注:Google把Gemini Pro的API免费提供给个人用户了,该模型具备GPT 3.5 级别的能力,32k 上下文,38 种语言支持以及多模态支持,唯一的约束是每分钟60个请求。

2023年第二次Go用户调查报告中,Go 开发者表示,他们对改善其编写代码的质量、可靠性和性能的人工智能/机器学习工具感兴趣,而不是编写代码的工具。一位时刻警醒、从不忙碌的专家“审阅者”可能是一种更有帮助的AI开发者辅助形式。Go官方表示了对该调查结果的重视,也许在后续的Go工具链中“AI加持”会成为常态。

1.3 求变

2023年8月,在Go 1.21版本刚刚发布后,Go官博就发布了Russ Cox编写的两篇文章:《Backward Compatibility, Go 1.21, and Go 2》和《Forward Compatibility and Toolchain Management in Go 1.21》,进一步明确了Go承诺的向后兼容的范围和方案,并第一次阐述了向前兼容性的具体方案,这两篇文章为Go语言后续的“求变”奠定了理论基础。

在向后兼容方面,从Go 1.21开始Russ Cox提出一些举措,比如:Go将扩展和规范化了GODEBUG的使用,其大致思路如下:

  • 对于每个在Go1兼容性承诺范围内的且可能会破坏(break)现有代码的新特性/新改变(比如:panic(nil)语义的改变)加入时,Go会向GODEBUG设置
    中添加一个新选项(比如GODEBUG=panicnil=1),以保留采用原语义进行编译的兼容能力;
  • GODEBUG中新增的选项将至少保留两年(4个Go release版本),对于一些影响重大的GODEBUG选项(比如http2client和http2server),保留的时间可能更长,甚至一直保留;
  • GODEBUG的选项设置与go.mod的go version是匹配的。例如,即便你现在的工具链是Go 1.21,如果go.mod中的go version为1.20,那么GODEBUG控制的新特性语义将不起作用,依旧保持Go 1.20时的行为。除非你将go.mod中的go version升级为go 1.21.0;
  • 在Go 1.21及以后版本中,除了可以使用像GODEBUG=panicnil=1的环境变量恢复原先语义外,还可以在main包中使用//go:debug指示符。

在向前兼容方面,Russ Cox提出的方案有些复杂难懂,这里就不赘述了,感兴趣的童鞋可以阅读一下我之前的文章《聊聊Go语言的向前兼容性和toolchain规则》了解更多细节。

1.3.1 语法填坑

在Go的诸多“求变”中,影响最大的还是对已有语法坑的“修正”,这些“填坑”工作或多或少都会对存量代码带去影响,甚至是break change,Go社区的反对声音也是不少。但无论怎样,这些工作已经在Go 1.21版本拉开帷幕了。比如:改变panic(nil)的语义以及对循环变量语义的变更,大家可以在《Go 1.21中值得关注的几个变化》一文中了解更多细节。

对现有语法坑的修正也进一步促进了“求新”,比如在修正loopvar语义的同时,for range支持对更多类型表达式的迭代也在进行中,比如Go 1.22中,for range将支持迭代整型表达式,并以试验特性提供了对函数迭代器的支持。

1.3.2 标准库v2示范

Go号称是“自带电池”的语言,其高质量的标准库得到了广大Gopher的欢迎。Go团队也一直努力推进Go标准库功能的丰富性,比如:Go 1.22中对http.ServeMux功能进行了增强,使其像第三方的gorilla/mux那样增加对带有通配符路由的匹配。

Go 1.22中,标准库还首次出现了v2版本包:math/rand/v2,这为后续标准库的vN方式演进提供了示范,从Go团队的官方issue、discussion中了解到,后续如sync/v2、encoding/json/v2等已经列上日程了。

2. Go所处的位置

很多人关注Go当前的状态:国内大厂用的多么?小厂是不是也在广泛采纳。这些问题我在往年的Go语言盘点时也都做过梳理,今年就不再提了。没有哪个大厂在广泛采用一门语言后,会在一年内全部推翻重写的;小厂对Go的采纳也是有惯性的。

今年先从我的两个意外“收获”开始。

2.1 两个意外的“收获”

2023年10月中旬,世界知名电动车厂商Tesla发布了新版fleet APIvehicle command SDK,鉴于本人也在智能网联汽车行业内打拼,于是对Tesla的此次发布做了一些深入了解。在Tesla的github主页上我赫然发现:Go是目前Tesla开源项目的第二大语言。

相对于传统的主机厂(车厂),Telsa算是比较开放的了。开放包含两个含义,一是将车端能力的开放,二是项目的开源。就目前了解到,国内主机厂还鲜有将车端能力开放出来的,开源就更是鲜见。但Tesla在这两方面都做到了,既开放了车端API,又做了针对性的开源,虽然目前其开源项目并不多。以前Tesla涉及到云端服务的项目多用Ruby,但从2022年开始,Go语言的使用逐渐增多,包括前面提到的Fleet API的Fleet Telemetry的参考server实现以及Tesla车辆远控SDK

我们再来看看Apache基金会。众所周知,Apache基金会的开源项目多以Java语言为主,但一次偶然的机会翻看Apache基金会的github项目主页,我发现Go语言在Apache开源项目中已经悄悄地跻身到第五名,如果仅算后端语言的话,Go排名第三,仅次于Java和Python。

并且,Apache基金会下面的Go项目实际也不少,大家可以通过https://github.com/orgs/apache/repositories?language=go&type=all查询。其中还不乏优秀之作,比如:构建Q&A知识系统的answerApache Dubbo的go实现dubbo-goCDN实现trafficcontrolKubernetes原生的轻量级企业应用集成框架Camel KApache Arrow的Go实现以及针对开发过程的聚合数据平台devlake等。

我们知道:Apache项目在企业级应用和平台方面具有广泛的应用,从Go语言在Apache基金会项目中的使用比例的提升现象来看,Go在企业应用市场中的普及度和受欢迎程度确实有所增长。

2.2 Go语言排名

编程语言之间的竞争与争议,通常被称为“编程语言战争”(programming language war),它其实反映了不同技术群体和范式之间的碰撞。这些“火药味”比较浓的语言之争通常比较主观。近10年来,业界出现了一些被广泛接受的编程语言排行榜,它们基于一些相对客观的数据来反映不同编程语言在现实开发中的真实状态。但不同编程语言排行榜都有不同的数据来源和数据模型,单一的排行榜往往是“盲人摸象”,无法反映全貌。但目前又没有一个可以让我们一窥全貌的权威排行榜。因此,要想更客观地、更全面的反映一门编程语言的实际情况,我们需要将多个排行榜参照着看。

下面我们就来看看在目前世界上著名的编程语言排行榜上,Go语言在其中的最新排名情况(请注意:各个榜单的发布时间不同,导致各榜单的数据会有一定时间差)。

2.2.1 PYPL编程语言排行榜

PYPL编程语言流行指数是通过分析语言教程在谷歌上的搜索频率而创建的。语言教程被搜索的次数越多,说明该语言越受欢迎,原始数据来自Google Trends:


PYPL编程语言排行榜,数据时间:2023.12

2.2.2 IEEE Spectrum排行榜

IEEE Spectrum排行榜是通过调查来自全球软件工程师和招聘网站的数据,统计各语言的流行度的:


IEEE Spectrum排行榜,数据时间:2023.8

2.2.3 RedMonk编程语言排行榜

RedMonk排行榜是根据GitHub和Stack Overflow这两个开发者社区上的讨论数量来推算语言的受关注度。


RedMonk编程语言排行榜,数据时间:2023.5

2.3.4 Github Octoverse

GitHub Octoverse排行榜直观反映了过去一年GitHub上各编程语言的实际使用和流行趋势,是从开源项目量的维度来衡量编程语言活跃度的。在Top 10语言榜单上,2023年Go超越Ruby第一次跻身Github Top10语言:


Github Octoverse编程语言排行榜,数据时间:2023.11


Github Octoverse编程语言排行榜,数据时间:2023.11

2.3.5 Github Language Stats(githut)

Github Language Stats是一个个人项目,它基于github公开数据,按时间、pr数量、star数量等维度对各个语言在github上的使用情况进行分析:


Githut按PR数量,数据时间:2023第三季度


Githut按Star数量,数据时间:2023第三季度

2.3.6 TIOBE编程语言排行榜

TIOBE编程语言排行榜理论上来说,是世界上最知名的编程语言排行榜,它根据各大搜索引擎编程语言相关的搜索查询量来计算一个综合指数。但这些年TIOBE榜单数据的“上蹿下跳”,让开发者对该榜单是“又爱又恨”。下面是TIOBE index 2023年12月份的榜单:

当你看到Fortran排在Go的前面,你就get到该榜单的抽风式的“不靠谱”了:)。

综合上述6个榜单,我们可以看到Go语言的2023基本处于稳定发展状态,没有“大踏步”的前进,也没有意想不到的大幅退步。

今年在国内某乎上总有一些有关“Go在国内是否已凉”的话题,从上面实际情况来看,话题中那些抹黑Go的观点可以不攻自破了。有人会说Rust的强势上升对Go会有一定冲击,这的确不可否认,就像Go当年火速蹿升给Java带去一定冲击一样,这是一门编程语言在演进阶段必会经历的过程,没有什么值得大惊小怪的。5年后,Rust可能同样也会受到来自其他语言的冲击。

Go语言未来会变得如何,关键还要看Go团队对Go未来演进方向的把握是否得当以及Go社区与生态是否给力。2023年,Go团队也明确了未来的演进机制和策略,接下来我们就来看看。

3. Go的未来演进

2023年是Go语言开源的第14个年头,Go语言早已蜕下了少年期的青涩,进入到了青年期。这意味着它拥有了越来越成熟稳定的语言特性,同时生态系统也日益丰富完善。作为一门青壮年语言,Go语言在系统设计方面展现出的高度工程化思想,使其轻松应对复杂系统的构建。以go module为主的模块化支持帮助大规模程序更加清晰化,丰富的并发控制手段使其可以处理海量请求。与此同时,Go语言生态也在蓬勃成长——各种高质量框架应运而生,无数module可复用,大量的云原生组件可供选择。这为开发者极大减轻了从零开始搭建系统的工作量。

和我们人类一样,一门语言进入青年期后的成熟特征并不能完全掩饰其未来演进的迷茫!在Ken Thompson、Rob Pike相继退休后,Russ Cox成为了Go这艘大船的“掌舵者”,Russ Cox与Go团队对编程语言的思考,对Go语言价值观的判断将直接决定Go未来的航向。

好在,在2023年的GopherCon大会上,我们得到了Russ Cox的答案:那就是基于共同目标和数据驱动的决策。这里借用Russ Cox在演讲中给出的结论来看看具体的演进驱动机制:

  • 首先,Go需要不断变化,特别是随着计算世界的变化。
  • 其次,任何改变的目标都是为了使Go在软件工程中变得更好,尤其是在规模化(scaling)方面。
  • 第三,一旦我们确定了目标,达成共识的下一个最重要的部分是拥有共享数据来做出决策。
  • 第四,Go工具链遥测是增补我们现有调查和代码分析数据的重要数据来源。

综上来看,Go团队要“拥抱变化”,但不能“无头苍蝇”一样的胡乱改变,而是严谨地基于广泛的数据反馈,包括来自用户调查、vscode插件运行的用户反馈、全年进行的研究访谈和用户体验研究等,以及来自即将加入Go工具链的可选遥测(opt-in Telemetry)功能获取到的更多真实的Go使用数据。

相信在Go工具链的可选遥测(opt-in Telemetry)功能加入后,Go团队能基于这些用户数据拿到更准确地决策依据,继续让Go这艘大船行驶在正确、光明的航向上!

4. 小结

在2023年,Go语言继续保持了其稳定性和可靠性的特点。发布了两个大版本,Go 1.20和Go 1.21,其中语法特性的改变相对较少,注重修复和优化。然而,Go语言在其他方面仍然保持着求新和求变的态势。

Go语言团队致力于优化编译器、工具链、运行时和标准库,以提升生产力和运行时效率。引入了一些新的特性和优化措施,例如PGO(profile-guided optimization)技术的引入和优化、内存管理方面的改进等。同时,Go语言在软件工程领域也进行了一些创新,如简化项目创建的gonew工具、代码覆盖率的采集工具、供应链安全领域的govulncheck工具等。

Go语言始终保持对新技术、新趋势和社区的开放姿态。在2023年,Go语言对WASM和WASI的支持得到了进一步加强。同时,Go社区也积极响应并跟随Go团队的步伐,面对IT界出现的大语言模型等新兴技术,Go社区也在不断探索和应用。

总体而言,2023年对于Go语言来说是一个稳中求新、稳中求变的年份。Go语言保持着其简洁、高效和易用的特点,同时积极适应和采纳新的技术和需求,为开发者提供更好的开发体验和工具支持。

展望未来,Go团队已经明确了更加以共识和用户数据为驱动的演进机制,保证Go的发展方向与实际需求保持同步。随着可选的工具链遥测功能加入,相信他们能基于更丰富的用户数据做出更正确、更具预见性的正确决策。

我个人依旧坚持我之前的判断:Go将进入或已处于自己的黄金5-10年


“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) – https://gopherdaily.tonybai.com

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • Gopher Daily归档 – https://github.com/bigwhite/gopherdaily

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

Go语言开发者的Apache Arrow使用指南:读写Parquet文件

本文永久链接 – https://tonybai.com/2023/07/31/a-guide-of-using-apache-arrow-for-gopher-part6

Apache Arrow是一种开放的、与语言无关的列式内存格式,在本系列文章的前几篇中,我们都聚焦于内存表示内存操作

但对于一个数据库系统或大数据分析平台来说,数据不能也无法一直放在内存中,虽说目前内存很大也足够便宜了,但其易失性也决定了我们在特定时刻还是要将数据序列化后存储到磁盘或一些低成本的存储服务上(比如AWS的S3等)。

那么将Arrow序列化成什么存储格式呢?CSV、JSON?显然这些格式都不是为最大限度提高空间效率以及数据检索能力而设计的。在数据分析领域,Apache Parquet是与Arrow相似的一种开放的、面向列的数据存储格式,它被设计用于高效的数据编码和检索并最大限度提高空间效率。

和Arrow是一种内存格式不同,Parquet是一种数据文件格式。此外,Arrow和Parquet在设计上也做出了各自的一些取舍。Arrow旨在由矢量化计算内核对数据进行操作,提供对任何数组索引的 O(1) 随机访问查找能力;而Parquet为了最大限度提高空间效率,采用了可变长度编码方案和块压缩来大幅减小数据大小,这些技术都是以丧失高性能随机存取查找为代价的。

Parquet也是Apache的顶级项目,大多数实现了Arrow的编程语言也都提供了支持Arrow格式与Parquet文件相互转换的库实现,Go也不例外。在本文中,我们就来粗浅看一下如何使用Go实现Parquet文件的读写,即Arrow和Parquet的相互转换。

注:关于Parquet文件的详细格式(也蛮复杂),我可能会在后续文章中说明。

1. Parquet简介

如果不先说一说Parquet文件格式,后面的内容理解起来会略有困难的。下面是一个Parquet文件的结构示意图:


图来自https://www.uber.com/blog/cost-efficiency-big-data

我们看到Parquet格式的文件被分为多个row group,每个row group由每一列的列块(column chunk)组成。考虑到磁盘存储的特点,每个列块又分为若干个页。这个列块中的诸多同构类型的列值可以在编码和压缩后存储在各个页中。下面是Parquet官方文档中Parquet文件中数据存储的具体示意图:

我们看到Parquet按row group顺序向后排列,每个row group中column chunk也是依column次序向后排列的。

注:关于上图中repetion level和definition level这样的高级概念,不会成为理解本文内容的障碍,我们将留到后续文章中系统说明。

2. Arrow Table <-> Parquet

有了上面Parquet文件格式的初步知识后,接下来我们就来看看如何使用Go在Arrow和Parquet之间进行转换。

《高级数据结构》一文中,我们学习了Arrow Table和Record Batch两种高级结构。接下来我们就来看看如何将Table或Record与Parquet进行转换。一旦像Table、Record Batch这样的高级结构的转换搞定了,那Arrow中的那些简单数据类型)也就不在话下了。况且在实际项目中,我们面对更多的也是Arrow的高级数据结构(Table或Record)与Parquet的转换。

我们先来看看Table。

2.1 Table -> Parquet

通过在《高级数据结构》一文,我们知道了Arrow Table的每一列本质上就是Schema+Chunked Array,这和Parquet的文件格式具有较高的适配度。

Arrow Go的parquet实现提供对了Table的良好支持,我们通过一个WriteTable函数就可以将内存中的Arrow Table持久化为Parquet格式的文件,我们来看看下面这个示例:

// flat_table_to_parquet.go

package main

import (
    "os"

    "github.com/apache/arrow/go/v13/arrow"
    "github.com/apache/arrow/go/v13/arrow/array"
    "github.com/apache/arrow/go/v13/arrow/memory"
    "github.com/apache/arrow/go/v13/parquet/pqarrow"
)

func main() {
    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "col1", Type: arrow.PrimitiveTypes.Int32},
            {Name: "col2", Type: arrow.PrimitiveTypes.Float64},
            {Name: "col3", Type: arrow.BinaryTypes.String},
        },
        nil,
    )

    col1 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            ib := array.NewInt32Builder(memory.DefaultAllocator)
            defer ib.Release()

            ib.AppendValues([]int32{1, 2, 3}, nil)
            i1 := ib.NewInt32Array()
            defer i1.Release()

            ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
            i2 := ib.NewInt32Array()
            defer i2.Release()

            c := arrow.NewChunked(
                arrow.PrimitiveTypes.Int32,
                []arrow.Array{i1, i2},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(0), chunk)
    }()
    defer col1.Release()

    col2 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            fb := array.NewFloat64Builder(memory.DefaultAllocator)
            defer fb.Release()

            fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
            f1 := fb.NewFloat64Array()
            defer f1.Release()

            fb.AppendValues([]float64{6.6, 7.7}, nil)
            f2 := fb.NewFloat64Array()
            defer f2.Release()

            fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
            f3 := fb.NewFloat64Array()
            defer f3.Release()

            c := arrow.NewChunked(
                arrow.PrimitiveTypes.Float64,
                []arrow.Array{f1, f2, f3},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(1), chunk)
    }()
    defer col2.Release()

    col3 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            sb := array.NewStringBuilder(memory.DefaultAllocator)
            defer sb.Release()

            sb.AppendValues([]string{"s1", "s2"}, nil)
            s1 := sb.NewStringArray()
            defer s1.Release()

            sb.AppendValues([]string{"s3", "s4"}, nil)
            s2 := sb.NewStringArray()
            defer s2.Release()

            sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
            s3 := sb.NewStringArray()
            defer s3.Release()

            c := arrow.NewChunked(
                arrow.BinaryTypes.String,
                []arrow.Array{s1, s2, s3},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(2), chunk)
    }()
    defer col3.Release()

    var tbl arrow.Table
    tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
    defer tbl.Release()

    f, err := os.Create("flat_table.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    err = pqarrow.WriteTable(tbl, f, 1024, nil, pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
}

我们基于arrow的Builder模式以及NewTable创建了一个拥有三个列的Table(该table的创建例子来自于《高级数据结构》一文)。有了table后,我们直接调用pqarrow的WriteTable函数即可将table写成parquet格式的文件。

我们来运行一下上述代码:

$go run flat_table_to_parquet.go

执行完上面命令后,当前目录下会出现一个flat_table.parquet的文件!

我们如何查看该文件内容来验证写入的数据是否与table一致呢?arrow go的parquet实现提供了一个parquet_reader的工具可以帮助我们做到这点,你可以执行如下命令安装这个工具:

$go install github.com/apache/arrow/go/v13/parquet/cmd/parquet_reader@latest

之后我们就可以执行下面命令查看我们刚刚生成的flat_table.parquet文件的内容了:

$parquet_reader flat_table.parquet
File name: flat_table.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 1
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 396  ---
--- Rows: 10  ---
Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 111
Column 1
 Values: 10, Min: 1.1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 169, Compressed Size: 169
Column 2
 Values: 10, Min: [115 49], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 116, Compressed Size: 116
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

parquet_reader列出了parquet文件的meta数据和每个row group中的column列的值,从输出来看,与我们arrow table的数据是一致的。

我们再回头看一下WriteTable函数,它的原型如下:

func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64,
                props *parquet.WriterProperties, arrprops ArrowWriterProperties) error

这里说一下WriteTable的前三个参数,第一个是通过NewTable得到的arrow table结构,第二个参数也容易理解,就是一个可写的文件描述符,我们通过os.Create可以轻松拿到,第三个参数为chunkSize,这个chunkSize是什么呢?会对parquet文件的写入结果有影响么?其实这个chunkSize就是每个row group中的行数。同时parquet通过该chunkSize也可以计算出arrow table转parquet文件后有几个row group。

我们示例中的chunkSize值为1024,因此整个parquet文件只有一个row group。下面我们将其值改为5,再来看看输出的parquet文件内容:

$parquet_reader flat_table.parquet
File name: flat_table.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 2
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 288  ---
--- Rows: 5  ---
Column 0
 Values: 5, Min: 1, Max: 5, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 86, Compressed Size: 86
Column 1
 Values: 5, Min: 1.1, Max: 5.5, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 122, Compressed Size: 122
Column 2
 Values: 5, Min: [115 49], Max: [115 53], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 80, Compressed Size: 80
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |

--- Row Group: 1  ---
--- Total Bytes: 290  ---
--- Rows: 5  ---
Column 0
 Values: 5, Min: 6, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 86, Compressed Size: 86
Column 1
 Values: 5, Min: 6.6, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 122, Compressed Size: 122
Column 2
 Values: 5, Min: [115 49 48], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 82, Compressed Size: 82
--- Values ---
col1              |col2              |col3              |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

当chunkSize值为5后,parquet文件的row group变成了2,然后parquet_reader工具会按照两个row group的格式分别输出它们的meta信息和列值信息。

接下来,我们再来看一下如何从生成的parquet文件中读取数据并转换为arrow table。

2.2 Table <- Parquet

和WriteTable函数对应,arrow提供了ReadTable函数读取parquet文件并转换为内存中的arrow table,下面是代码示例:

// flat_table_from_parquet.go
func main() {
    f, err := os.Open("flat_table.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    tbl, err := pqarrow.ReadTable(context.Background(), f, parquet.NewReaderProperties(memory.DefaultAllocator),
        pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    dumpTable(tbl)
}

func dumpTable(tbl arrow.Table) {
    s := tbl.Schema()
    fmt.Println(s)
    fmt.Println("------")

    fmt.Println("the count of table columns=", tbl.NumCols())
    fmt.Println("the count of table rows=", tbl.NumRows())
    fmt.Println("------")

    for i := 0; i < int(tbl.NumCols()); i++ {
        col := tbl.Column(i)
        fmt.Printf("arrays in column(%s):\n", col.Name())
        chunk := col.Data()
        for _, arr := range chunk.Chunks() {
            fmt.Println(arr)
        }
        fmt.Println("------")
    }
}

我们看到ReadTable使用起来非常简单,由于parquet文件中包含meta信息,我们调用ReadTable时,一些参数使用默认值或零值即可。

我们运行一下上述代码:

$go run flat_table_from_parquet.go
schema:
  fields: 3
    - col1: type=int32
      metadata: ["PARQUET:field_id": "-1"]
    - col2: type=float64
      metadata: ["PARQUET:field_id": "-1"]
    - col3: type=utf8
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3 4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2" "s3" "s4" "s5" "s6" "s7" "s8" "s9" "s10"]
------

2.3 Table -> Parquet(压缩)

前面提到,Parquet文件格式的设计充分考虑了空间利用效率,再加上其是面向列存储的格式,Parquet支持列数据的压缩存储,并支持为不同列选择不同的压缩算法。

前面示例中调用的WriteTable在默认情况下是不对列进行压缩的,这从parquet_reader读取到的列的元信息中也可以看到(比如下面的Compression: UNCOMPRESSED):

Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 111

我们在WriteTable时也可以通过parquet.WriterProperties参数来为每个列指定压缩算法,比如下面示例:

// flat_table_to_parquet_compressed.go

var tbl arrow.Table
tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
defer tbl.Release()

f, err := os.Create("flat_table_compressed.parquet")
if err != nil {
    panic(err)
}
defer f.Close()

wp := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy),
    parquet.WithCompressionFor("col1", compress.Codecs.Brotli))
err = pqarrow.WriteTable(tbl, f, 1024, wp, pqarrow.DefaultWriterProps())
if err != nil {
    panic(err)
}

在这段代码中,我们通过parquet.NewWriterProperties构建了新的WriterProperties,这个新的Properties默认所有列使用Snappy压缩,针对col1列使用Brotli算法压缩。我们将压缩后的数据写入flat_table_compressed.parquet文件。使用go run运行flat_table_to_parquet_compressed.go,然后使用parquet_reader查看文件flat_table_compressed.parquet得到如下结果:

$go run flat_table_to_parquet_compressed.go
$parquet_reader flat_table_compressed.parquet
File name: flat_table_compressed.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 1
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 352  ---
--- Rows: 10  ---
Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 98
Column 1
 Values: 10, Min: 1.1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: SNAPPY, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 168, Compressed Size: 148
Column 2
 Values: 10, Min: [115 49], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: SNAPPY, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 116, Compressed Size: 106
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

从parquet_reader的输出,我们可以看到:各个Column的Compression信息不再是UNCOMPRESSED了,并且三个列在经过压缩后的Size与未压缩对比都有一定的减小:

Column 0:
    Compression: BROTLI, Uncompressed Size: 111, Compressed Size: 98
Column 1:
    Compression: SNAPPY, Uncompressed Size: 168, Compressed Size: 148
Column 2:
    Compression: SNAPPY, Uncompressed Size: 116, Compressed Size: 106

从文件大小对比也能体现出压缩算法的作用:

-rw-r--r--   1 tonybai  staff   786  7 22 08:06 flat_table.parquet
-rw-r--r--   1 tonybai  staff   742  7 20 13:19 flat_table_compressed.parquet

Go的parquet实现支持多种压缩算法:

// github.com/apache/arrow/go/parquet/compress/compress.go

var Codecs = struct {
    Uncompressed Compression
    Snappy       Compression
    Gzip         Compression
    // LZO is unsupported in this library since LZO license is incompatible with Apache License
    Lzo    Compression
    Brotli Compression
    // LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4
    // see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E
    Lz4  Compression
    Zstd Compression
}{
    Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
    Snappy:       Compression(parquet.CompressionCodec_SNAPPY),
    Gzip:         Compression(parquet.CompressionCodec_GZIP),
    Lzo:          Compression(parquet.CompressionCodec_LZO),
    Brotli:       Compression(parquet.CompressionCodec_BROTLI),
    Lz4:          Compression(parquet.CompressionCodec_LZ4),
    Zstd:         Compression(parquet.CompressionCodec_ZSTD),
}

你只需要根据你的列的类型选择最适合的压缩算法即可。

2.4 Table <- Parquet(压缩)

接下来,我们来读取这个数据经过压缩的Parquet。读取压缩的Parquet是否需要在ReadTable时传入特殊的Properties呢?答案是不需要!因为Parquet文件中存储了元信息(metadata),可以帮助ReadTable使用对应的算法解压缩并提取信息:

// flat_table_from_parquet_compressed.go

func main() {
    f, err := os.Open("flat_table_compressed.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    tbl, err := pqarrow.ReadTable(context.Background(), f, parquet.NewReaderProperties(memory.DefaultAllocator),
        pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    dumpTable(tbl)
}

运行这段程序,我们就可以读取压缩后的parquet文件了:

$go run flat_table_from_parquet_compressed.go
schema:
  fields: 3
    - col1: type=int32
      metadata: ["PARQUET:field_id": "-1"]
    - col2: type=float64
      metadata: ["PARQUET:field_id": "-1"]
    - col3: type=utf8
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3 4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2" "s3" "s4" "s5" "s6" "s7" "s8" "s9" "s10"]
------

接下来,我们来看看Arrow中的另外一种高级数据结构Record Batch如何实现与Parquet文件格式的转换。

3. Arrow Record Batch <-> Parquet

注:大家可以先阅读/温习一下《高级数据结构》一文来了解一下Record Batch的概念。

3.1 Record Batch -> Parquet

Arrow Go实现将一个Record Batch作为一个Row group来对应。下面的程序向Parquet文件中写入了三个record,我们来看一下:

// flat_record_to_parquet.go

func main() {
    var records []arrow.Record
    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "archer", Type: arrow.BinaryTypes.String},
            {Name: "location", Type: arrow.BinaryTypes.String},
            {Name: "year", Type: arrow.PrimitiveTypes.Int16},
        },
        nil,
    )

    rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
    defer rb.Release()

    for i := 0; i < 3; i++ {
        postfix := strconv.Itoa(i)
        rb.Field(0).(*array.StringBuilder).AppendValues([]string{"tony" + postfix, "amy" + postfix, "jim" + postfix}, nil)
        rb.Field(1).(*array.StringBuilder).AppendValues([]string{"beijing" + postfix, "shanghai" + postfix, "chengdu" + postfix}, nil)
        rb.Field(2).(*array.Int16Builder).AppendValues([]int16{1992 + int16(i), 1993 + int16(i), 1994 + int16(i)}, nil)
        rec := rb.NewRecord()
        records = append(records, rec)
    }

    // write to parquet
    f, err := os.Create("flat_record.parquet")
    if err != nil {
        panic(err)
    }

    props := parquet.NewWriterProperties()
    writer, err := pqarrow.NewFileWriter(schema, f, props,
        pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
    defer writer.Close()

    for _, rec := range records {
        if err := writer.Write(rec); err != nil {
            panic(err)
        }
        rec.Release()
    }
}

和调用WriteTable完成table到parquet文件的写入不同,这里我们创建了一个FileWriter,通过FileWriter将构建出的Record Batch逐个写入。运行上述代码生成flat_record.parquet文件并使用parquet_reader展示该文件的内容:

$go run flat_record_to_parquet.go
$parquet_reader flat_record.parquet
File name: flat_record.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 9
Number of RowGroups: 3
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: archer (BYTE_ARRAY/UTF8)
Column 1: location (BYTE_ARRAY/UTF8)
Column 2: year (INT32/INT_16)
--- Row Group: 0  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 48], Max: [116 111 110 121 48], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 48], Max: [115 104 97 110 103 104 97 105 48], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1992, Max: 1994, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony0             |beijing0          |1992              |
amy0              |shanghai0         |1993              |
jim0              |chengdu0          |1994              |

--- Row Group: 1  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 49], Max: [116 111 110 121 49], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 49], Max: [115 104 97 110 103 104 97 105 49], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1993, Max: 1995, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony1             |beijing1          |1993              |
amy1              |shanghai1         |1994              |
jim1              |chengdu1          |1995              |

--- Row Group: 2  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 50], Max: [116 111 110 121 50], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 50], Max: [115 104 97 110 103 104 97 105 50], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1994, Max: 1996, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony2             |beijing2          |1994              |
amy2              |shanghai2         |1995              |
jim2              |chengdu2          |1996              |

我们看到parquet_reader分别输出了三个row group的元数据和列值,每个row group与我们写入的一个record对应。

那读取这样的parquet文件与ReadTable有何不同呢?我们继续往下看。

3.2 Record Batch <- Parquet

下面是用于读取

// flat_record_from_parquet.go
func main() {
    f, err := os.Open("flat_record.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    rdr, err := file.NewParquetReader(f)
    if err != nil {
        panic(err)
    }
    defer rdr.Close()

    arrRdr, err := pqarrow.NewFileReader(rdr,
        pqarrow.ArrowReadProperties{
            BatchSize: 3,
        }, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    s, _ := arrRdr.Schema()
    fmt.Println(*s)

    rr, err := arrRdr.GetRecordReader(context.Background(), nil, nil)
    if err != nil {
        panic(err)
    }

    for {
        rec, err := rr.Read()
        if err != nil && err != io.EOF {
            panic(err)
        }
        if err == io.EOF {
            break
        }
        fmt.Println(rec)
    }
}

我们看到相对于将parquet转换为table,将parquet转换为record略为复杂一些,这里的一个关键是在调用NewFileReader时传入的ArrowReadProperties中的BatchSize字段,要想正确读取出record,这个BatchSize需适当填写。这个BatchSize会告诉Reader 每个读取的Record Batch的长度,也就是row数量。这里传入的是3,即3个row为一个Recordd batch。

下面是运行上述程序的结果:

$go run flat_record_from_parquet.go
{[{archer 0x26ccc00 false {[PARQUET:field_id] [-1]}} {location 0x26ccc00 false {[PARQUET:field_id] [-1]}} {year 0x26ccc00 false {[PARQUET:field_id] [-1]}}] map[archer:[0] location:[1] year:[2]] {[] []} 0}
record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony0" "amy0" "jim0"]
  col[1][location]: ["beijing0" "shanghai0" "chengdu0"]
  col[2][year]: [1992 1993 1994]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony1" "amy1" "jim1"]
  col[1][location]: ["beijing1" "shanghai1" "chengdu1"]
  col[2][year]: [1993 1994 1995]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony2" "amy2" "jim2"]
  col[1][location]: ["beijing2" "shanghai2" "chengdu2"]
  col[2][year]: [1994 1995 1996]

我们看到:每3行被作为一个record读取出来了。如果将BatchSize改为5,则输出如下:

$go run flat_record_from_parquet.go
{[{archer 0x26ccc00 false {[PARQUET:field_id] [-1]}} {location 0x26ccc00 false {[PARQUET:field_id] [-1]}} {year 0x26ccc00 false {[PARQUET:field_id] [-1]}}] map[archer:[0] location:[1] year:[2]] {[] []} 0}
record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 5
  col[0][archer]: ["tony0" "amy0" "jim0" "tony1" "amy1"]
  col[1][location]: ["beijing0" "shanghai0" "chengdu0" "beijing1" "shanghai1"]
  col[2][year]: [1992 1993 1994 1993 1994]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 4
  col[0][archer]: ["jim1" "tony2" "amy2" "jim2"]
  col[1][location]: ["chengdu1" "beijing2" "shanghai2" "chengdu2"]
  col[2][year]: [1995 1994 1995 1996]

这次:前5行作为一个record,后4行作为另外一个record。

当然,我们也可以使用flat_table_from_parquet.go中的代码来读取flat_record.parquet(将读取文件名改为flat_record.parquet),只不过由于将parquet数据转换为了table,其输出内容将变为:

$go run flat_table_from_parquet.go
schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 9
------
arrays in column(archer):
["tony0" "amy0" "jim0" "tony1" "amy1" "jim1" "tony2" "amy2" "jim2"]
------
arrays in column(location):
["beijing0" "shanghai0" "chengdu0" "beijing1" "shanghai1" "chengdu1" "beijing2" "shanghai2" "chengdu2"]
------
arrays in column(year):
[1992 1993 1994 1993 1994 1995 1994 1995 1996]
------

3.3 Record Batch -> Parquet(压缩)

Recod同样支持压缩写入Parquet,其原理与前面table压缩存储是一致的,都是通过设置WriterProperties来实现的:

// flat_record_to_parquet_compressed.go

func main() {
    ... ...
    f, err := os.Create("flat_record_compressed.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    props := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Zstd),
        parquet.WithCompressionFor("year", compress.Codecs.Brotli))
    writer, err := pqarrow.NewFileWriter(schema, f, props,
        pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
    defer writer.Close()

    for _, rec := range records {
        if err := writer.Write(rec); err != nil {
            panic(err)
        }
        rec.Release()
    }
}

不过这次针对arrow.string类型和arrow.int16类型的压缩效果非常“差”:

$parquet_reader flat_record_compressed.parquet
File name: flat_record_compressed.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 9
Number of RowGroups: 3
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: archer (BYTE_ARRAY/UTF8)
Column 1: location (BYTE_ARRAY/UTF8)
Column 2: year (INT32/INT_16)
--- Row Group: 0  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 48], Max: [116 111 110 121 48], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 48], Max: [115 104 97 110 103 104 97 105 48], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1992, Max: 1994, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony0             |beijing0          |1992              |
amy0              |shanghai0         |1993              |
jim0              |chengdu0          |1994              |

--- Row Group: 1  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 49], Max: [116 111 110 121 49], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 49], Max: [115 104 97 110 103 104 97 105 49], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1993, Max: 1995, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony1             |beijing1          |1993              |
amy1              |shanghai1         |1994              |
jim1              |chengdu1          |1995              |

--- Row Group: 2  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 50], Max: [116 111 110 121 50], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 50], Max: [115 104 97 110 103 104 97 105 50], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1994, Max: 1996, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony2             |beijing2          |1994              |
amy2              |shanghai2         |1995              |
jim2              |chengdu2          |1996              |

越压缩,parquet文件的size越大。当然这个问题不是我们这篇文章的重点,只是提醒大家选择适当的压缩算法十分重要

3.4 Record Batch <- Parquet(压缩)

和读取table转换后的压缩parquet文件一样,读取record转换后的压缩parquet一样无需特殊设置,使用flat_record_from_parquet.go即可(需要改一下读取的文件名),这里就不赘述了。

4. 小结

本文旨在介绍使用Go进行Arrow和Parquet文件相互转换的基本方法,我们以table和record两种高级数据结构为例,分别介绍了读写parquet文件以及压缩parquet文件的方法。

当然本文中的例子都是“平坦(flat)”的简单例子,parquet文件还支持更复杂的嵌套数据,我们会在后续的深入讲解parquet格式的文章中提及。

本文示例代码可以在这里下载。

5. 参考资料

  • Parquet File Format – https://parquet.apache.org/docs/file-format/
  • 《Dremel: Interactive Analysis of Web-Scale Datasets》 – https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36632.pdf
  • Announcing Parquet 1.0: Columnar Storage for Hadoop – https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop
  • Dremel made simple with Parquet – https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet
  • parquet项目首页 – http://parquet.apache.org/
  • Apache Parquet介绍 by influxdata – https://www.influxdata.com/glossary/apache-parquet/
  • Intro to InfluxDB IOx – https://www.influxdata.com/blog/intro-influxdb-iox/
  • Apache Arrow介绍 by influxdb – https://www.influxdata.com/glossary/apache-arrow/
  • 开源时序数据库解析 – InfluxDB IOx – https://zhuanlan.zhihu.com/p/534035337
  • Arrow and Parquet Part 1: Primitive Types and Nullability – https://arrow.apache.org/blog/2022/10/05/arrow-parquet-encoding-part-1/
  • Arrow and Parquet Part 2: Nested and Hierarchical Data using Structs and Lists – https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/
  • Arrow and Parquet Part 3: Arbitrary Nesting with Lists of Structs and Structs of Lists – https://arrow.apache.org/blog/2022/10/17/arrow-parquet-encoding-part-3/
  • Cost Efficiency @ Scale in Big Data File Format – https://www.uber.com/blog/cost-efficiency-big-data/

“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats