标签 Go 下的文章

Go语言开发者的Apache Arrow使用指南:读写Parquet文件

本文永久链接 – https://tonybai.com/2023/07/31/a-guide-of-using-apache-arrow-for-gopher-part6

Apache Arrow是一种开放的、与语言无关的列式内存格式,在本系列文章的前几篇中,我们都聚焦于内存表示内存操作

但对于一个数据库系统或大数据分析平台来说,数据不能也无法一直放在内存中,虽说目前内存很大也足够便宜了,但其易失性也决定了我们在特定时刻还是要将数据序列化后存储到磁盘或一些低成本的存储服务上(比如AWS的S3等)。

那么将Arrow序列化成什么存储格式呢?CSV、JSON?显然这些格式都不是为最大限度提高空间效率以及数据检索能力而设计的。在数据分析领域,Apache Parquet是与Arrow相似的一种开放的、面向列的数据存储格式,它被设计用于高效的数据编码和检索并最大限度提高空间效率。

和Arrow是一种内存格式不同,Parquet是一种数据文件格式。此外,Arrow和Parquet在设计上也做出了各自的一些取舍。Arrow旨在由矢量化计算内核对数据进行操作,提供对任何数组索引的 O(1) 随机访问查找能力;而Parquet为了最大限度提高空间效率,采用了可变长度编码方案和块压缩来大幅减小数据大小,这些技术都是以丧失高性能随机存取查找为代价的。

Parquet也是Apache的顶级项目,大多数实现了Arrow的编程语言也都提供了支持Arrow格式与Parquet文件相互转换的库实现,Go也不例外。在本文中,我们就来粗浅看一下如何使用Go实现Parquet文件的读写,即Arrow和Parquet的相互转换。

注:关于Parquet文件的详细格式(也蛮复杂),我可能会在后续文章中说明。

1. Parquet简介

如果不先说一说Parquet文件格式,后面的内容理解起来会略有困难的。下面是一个Parquet文件的结构示意图:


图来自https://www.uber.com/blog/cost-efficiency-big-data

我们看到Parquet格式的文件被分为多个row group,每个row group由每一列的列块(column chunk)组成。考虑到磁盘存储的特点,每个列块又分为若干个页。这个列块中的诸多同构类型的列值可以在编码和压缩后存储在各个页中。下面是Parquet官方文档中Parquet文件中数据存储的具体示意图:

我们看到Parquet按row group顺序向后排列,每个row group中column chunk也是依column次序向后排列的。

注:关于上图中repetion level和definition level这样的高级概念,不会成为理解本文内容的障碍,我们将留到后续文章中系统说明。

2. Arrow Table <-> Parquet

有了上面Parquet文件格式的初步知识后,接下来我们就来看看如何使用Go在Arrow和Parquet之间进行转换。

《高级数据结构》一文中,我们学习了Arrow Table和Record Batch两种高级结构。接下来我们就来看看如何将Table或Record与Parquet进行转换。一旦像Table、Record Batch这样的高级结构的转换搞定了,那Arrow中的那些简单数据类型)也就不在话下了。况且在实际项目中,我们面对更多的也是Arrow的高级数据结构(Table或Record)与Parquet的转换。

我们先来看看Table。

2.1 Table -> Parquet

通过在《高级数据结构》一文,我们知道了Arrow Table的每一列本质上就是Schema+Chunked Array,这和Parquet的文件格式具有较高的适配度。

Arrow Go的parquet实现提供对了Table的良好支持,我们通过一个WriteTable函数就可以将内存中的Arrow Table持久化为Parquet格式的文件,我们来看看下面这个示例:

// flat_table_to_parquet.go

package main

import (
    "os"

    "github.com/apache/arrow/go/v13/arrow"
    "github.com/apache/arrow/go/v13/arrow/array"
    "github.com/apache/arrow/go/v13/arrow/memory"
    "github.com/apache/arrow/go/v13/parquet/pqarrow"
)

func main() {
    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "col1", Type: arrow.PrimitiveTypes.Int32},
            {Name: "col2", Type: arrow.PrimitiveTypes.Float64},
            {Name: "col3", Type: arrow.BinaryTypes.String},
        },
        nil,
    )

    col1 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            ib := array.NewInt32Builder(memory.DefaultAllocator)
            defer ib.Release()

            ib.AppendValues([]int32{1, 2, 3}, nil)
            i1 := ib.NewInt32Array()
            defer i1.Release()

            ib.AppendValues([]int32{4, 5, 6, 7, 8, 9, 10}, nil)
            i2 := ib.NewInt32Array()
            defer i2.Release()

            c := arrow.NewChunked(
                arrow.PrimitiveTypes.Int32,
                []arrow.Array{i1, i2},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(0), chunk)
    }()
    defer col1.Release()

    col2 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            fb := array.NewFloat64Builder(memory.DefaultAllocator)
            defer fb.Release()

            fb.AppendValues([]float64{1.1, 2.2, 3.3, 4.4, 5.5}, nil)
            f1 := fb.NewFloat64Array()
            defer f1.Release()

            fb.AppendValues([]float64{6.6, 7.7}, nil)
            f2 := fb.NewFloat64Array()
            defer f2.Release()

            fb.AppendValues([]float64{8.8, 9.9, 10.0}, nil)
            f3 := fb.NewFloat64Array()
            defer f3.Release()

            c := arrow.NewChunked(
                arrow.PrimitiveTypes.Float64,
                []arrow.Array{f1, f2, f3},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(1), chunk)
    }()
    defer col2.Release()

    col3 := func() *arrow.Column {
        chunk := func() *arrow.Chunked {
            sb := array.NewStringBuilder(memory.DefaultAllocator)
            defer sb.Release()

            sb.AppendValues([]string{"s1", "s2"}, nil)
            s1 := sb.NewStringArray()
            defer s1.Release()

            sb.AppendValues([]string{"s3", "s4"}, nil)
            s2 := sb.NewStringArray()
            defer s2.Release()

            sb.AppendValues([]string{"s5", "s6", "s7", "s8", "s9", "s10"}, nil)
            s3 := sb.NewStringArray()
            defer s3.Release()

            c := arrow.NewChunked(
                arrow.BinaryTypes.String,
                []arrow.Array{s1, s2, s3},
            )
            return c
        }()
        defer chunk.Release()

        return arrow.NewColumn(schema.Field(2), chunk)
    }()
    defer col3.Release()

    var tbl arrow.Table
    tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
    defer tbl.Release()

    f, err := os.Create("flat_table.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    err = pqarrow.WriteTable(tbl, f, 1024, nil, pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
}

我们基于arrow的Builder模式以及NewTable创建了一个拥有三个列的Table(该table的创建例子来自于《高级数据结构》一文)。有了table后,我们直接调用pqarrow的WriteTable函数即可将table写成parquet格式的文件。

我们来运行一下上述代码:

$go run flat_table_to_parquet.go

执行完上面命令后,当前目录下会出现一个flat_table.parquet的文件!

我们如何查看该文件内容来验证写入的数据是否与table一致呢?arrow go的parquet实现提供了一个parquet_reader的工具可以帮助我们做到这点,你可以执行如下命令安装这个工具:

$go install github.com/apache/arrow/go/v13/parquet/cmd/parquet_reader@latest

之后我们就可以执行下面命令查看我们刚刚生成的flat_table.parquet文件的内容了:

$parquet_reader flat_table.parquet
File name: flat_table.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 1
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 396  ---
--- Rows: 10  ---
Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 111
Column 1
 Values: 10, Min: 1.1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 169, Compressed Size: 169
Column 2
 Values: 10, Min: [115 49], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 116, Compressed Size: 116
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

parquet_reader列出了parquet文件的meta数据和每个row group中的column列的值,从输出来看,与我们arrow table的数据是一致的。

我们再回头看一下WriteTable函数,它的原型如下:

func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64,
                props *parquet.WriterProperties, arrprops ArrowWriterProperties) error

这里说一下WriteTable的前三个参数,第一个是通过NewTable得到的arrow table结构,第二个参数也容易理解,就是一个可写的文件描述符,我们通过os.Create可以轻松拿到,第三个参数为chunkSize,这个chunkSize是什么呢?会对parquet文件的写入结果有影响么?其实这个chunkSize就是每个row group中的行数。同时parquet通过该chunkSize也可以计算出arrow table转parquet文件后有几个row group。

我们示例中的chunkSize值为1024,因此整个parquet文件只有一个row group。下面我们将其值改为5,再来看看输出的parquet文件内容:

$parquet_reader flat_table.parquet
File name: flat_table.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 2
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 288  ---
--- Rows: 5  ---
Column 0
 Values: 5, Min: 1, Max: 5, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 86, Compressed Size: 86
Column 1
 Values: 5, Min: 1.1, Max: 5.5, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 122, Compressed Size: 122
Column 2
 Values: 5, Min: [115 49], Max: [115 53], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 80, Compressed Size: 80
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |

--- Row Group: 1  ---
--- Total Bytes: 290  ---
--- Rows: 5  ---
Column 0
 Values: 5, Min: 6, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 86, Compressed Size: 86
Column 1
 Values: 5, Min: 6.6, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 122, Compressed Size: 122
Column 2
 Values: 5, Min: [115 49 48], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 82, Compressed Size: 82
--- Values ---
col1              |col2              |col3              |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

当chunkSize值为5后,parquet文件的row group变成了2,然后parquet_reader工具会按照两个row group的格式分别输出它们的meta信息和列值信息。

接下来,我们再来看一下如何从生成的parquet文件中读取数据并转换为arrow table。

2.2 Table <- Parquet

和WriteTable函数对应,arrow提供了ReadTable函数读取parquet文件并转换为内存中的arrow table,下面是代码示例:

// flat_table_from_parquet.go
func main() {
    f, err := os.Open("flat_table.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    tbl, err := pqarrow.ReadTable(context.Background(), f, parquet.NewReaderProperties(memory.DefaultAllocator),
        pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    dumpTable(tbl)
}

func dumpTable(tbl arrow.Table) {
    s := tbl.Schema()
    fmt.Println(s)
    fmt.Println("------")

    fmt.Println("the count of table columns=", tbl.NumCols())
    fmt.Println("the count of table rows=", tbl.NumRows())
    fmt.Println("------")

    for i := 0; i < int(tbl.NumCols()); i++ {
        col := tbl.Column(i)
        fmt.Printf("arrays in column(%s):\n", col.Name())
        chunk := col.Data()
        for _, arr := range chunk.Chunks() {
            fmt.Println(arr)
        }
        fmt.Println("------")
    }
}

我们看到ReadTable使用起来非常简单,由于parquet文件中包含meta信息,我们调用ReadTable时,一些参数使用默认值或零值即可。

我们运行一下上述代码:

$go run flat_table_from_parquet.go
schema:
  fields: 3
    - col1: type=int32
      metadata: ["PARQUET:field_id": "-1"]
    - col2: type=float64
      metadata: ["PARQUET:field_id": "-1"]
    - col3: type=utf8
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3 4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2" "s3" "s4" "s5" "s6" "s7" "s8" "s9" "s10"]
------

2.3 Table -> Parquet(压缩)

前面提到,Parquet文件格式的设计充分考虑了空间利用效率,再加上其是面向列存储的格式,Parquet支持列数据的压缩存储,并支持为不同列选择不同的压缩算法。

前面示例中调用的WriteTable在默认情况下是不对列进行压缩的,这从parquet_reader读取到的列的元信息中也可以看到(比如下面的Compression: UNCOMPRESSED):

Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 111

我们在WriteTable时也可以通过parquet.WriterProperties参数来为每个列指定压缩算法,比如下面示例:

// flat_table_to_parquet_compressed.go

var tbl arrow.Table
tbl = array.NewTable(schema, []arrow.Column{*col1, *col2, *col3}, -1)
defer tbl.Release()

f, err := os.Create("flat_table_compressed.parquet")
if err != nil {
    panic(err)
}
defer f.Close()

wp := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy),
    parquet.WithCompressionFor("col1", compress.Codecs.Brotli))
err = pqarrow.WriteTable(tbl, f, 1024, wp, pqarrow.DefaultWriterProps())
if err != nil {
    panic(err)
}

在这段代码中,我们通过parquet.NewWriterProperties构建了新的WriterProperties,这个新的Properties默认所有列使用Snappy压缩,针对col1列使用Brotli算法压缩。我们将压缩后的数据写入flat_table_compressed.parquet文件。使用go run运行flat_table_to_parquet_compressed.go,然后使用parquet_reader查看文件flat_table_compressed.parquet得到如下结果:

$go run flat_table_to_parquet_compressed.go
$parquet_reader flat_table_compressed.parquet
File name: flat_table_compressed.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 10
Number of RowGroups: 1
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: col1 (INT32/INT_32)
Column 1: col2 (DOUBLE)
Column 2: col3 (BYTE_ARRAY/UTF8)
--- Row Group: 0  ---
--- Total Bytes: 352  ---
--- Rows: 10  ---
Column 0
 Values: 10, Min: 1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 111, Compressed Size: 98
Column 1
 Values: 10, Min: 1.1, Max: 10, Null Values: 0, Distinct Values: 0
 Compression: SNAPPY, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 168, Compressed Size: 148
Column 2
 Values: 10, Min: [115 49], Max: [115 57], Null Values: 0, Distinct Values: 0
 Compression: SNAPPY, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 116, Compressed Size: 106
--- Values ---
col1              |col2              |col3              |
1                 |1.100000          |s1                |
2                 |2.200000          |s2                |
3                 |3.300000          |s3                |
4                 |4.400000          |s4                |
5                 |5.500000          |s5                |
6                 |6.600000          |s6                |
7                 |7.700000          |s7                |
8                 |8.800000          |s8                |
9                 |9.900000          |s9                |
10                |10.000000         |s10               |

从parquet_reader的输出,我们可以看到:各个Column的Compression信息不再是UNCOMPRESSED了,并且三个列在经过压缩后的Size与未压缩对比都有一定的减小:

Column 0:
    Compression: BROTLI, Uncompressed Size: 111, Compressed Size: 98
Column 1:
    Compression: SNAPPY, Uncompressed Size: 168, Compressed Size: 148
Column 2:
    Compression: SNAPPY, Uncompressed Size: 116, Compressed Size: 106

从文件大小对比也能体现出压缩算法的作用:

-rw-r--r--   1 tonybai  staff   786  7 22 08:06 flat_table.parquet
-rw-r--r--   1 tonybai  staff   742  7 20 13:19 flat_table_compressed.parquet

Go的parquet实现支持多种压缩算法:

// github.com/apache/arrow/go/parquet/compress/compress.go

var Codecs = struct {
    Uncompressed Compression
    Snappy       Compression
    Gzip         Compression
    // LZO is unsupported in this library since LZO license is incompatible with Apache License
    Lzo    Compression
    Brotli Compression
    // LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4
    // see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E
    Lz4  Compression
    Zstd Compression
}{
    Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
    Snappy:       Compression(parquet.CompressionCodec_SNAPPY),
    Gzip:         Compression(parquet.CompressionCodec_GZIP),
    Lzo:          Compression(parquet.CompressionCodec_LZO),
    Brotli:       Compression(parquet.CompressionCodec_BROTLI),
    Lz4:          Compression(parquet.CompressionCodec_LZ4),
    Zstd:         Compression(parquet.CompressionCodec_ZSTD),
}

你只需要根据你的列的类型选择最适合的压缩算法即可。

2.4 Table <- Parquet(压缩)

接下来,我们来读取这个数据经过压缩的Parquet。读取压缩的Parquet是否需要在ReadTable时传入特殊的Properties呢?答案是不需要!因为Parquet文件中存储了元信息(metadata),可以帮助ReadTable使用对应的算法解压缩并提取信息:

// flat_table_from_parquet_compressed.go

func main() {
    f, err := os.Open("flat_table_compressed.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    tbl, err := pqarrow.ReadTable(context.Background(), f, parquet.NewReaderProperties(memory.DefaultAllocator),
        pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    dumpTable(tbl)
}

运行这段程序,我们就可以读取压缩后的parquet文件了:

$go run flat_table_from_parquet_compressed.go
schema:
  fields: 3
    - col1: type=int32
      metadata: ["PARQUET:field_id": "-1"]
    - col2: type=float64
      metadata: ["PARQUET:field_id": "-1"]
    - col3: type=utf8
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 10
------
arrays in column(col1):
[1 2 3 4 5 6 7 8 9 10]
------
arrays in column(col2):
[1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9 10]
------
arrays in column(col3):
["s1" "s2" "s3" "s4" "s5" "s6" "s7" "s8" "s9" "s10"]
------

接下来,我们来看看Arrow中的另外一种高级数据结构Record Batch如何实现与Parquet文件格式的转换。

3. Arrow Record Batch <-> Parquet

注:大家可以先阅读/温习一下《高级数据结构》一文来了解一下Record Batch的概念。

3.1 Record Batch -> Parquet

Arrow Go实现将一个Record Batch作为一个Row group来对应。下面的程序向Parquet文件中写入了三个record,我们来看一下:

// flat_record_to_parquet.go

func main() {
    var records []arrow.Record
    schema := arrow.NewSchema(
        []arrow.Field{
            {Name: "archer", Type: arrow.BinaryTypes.String},
            {Name: "location", Type: arrow.BinaryTypes.String},
            {Name: "year", Type: arrow.PrimitiveTypes.Int16},
        },
        nil,
    )

    rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
    defer rb.Release()

    for i := 0; i < 3; i++ {
        postfix := strconv.Itoa(i)
        rb.Field(0).(*array.StringBuilder).AppendValues([]string{"tony" + postfix, "amy" + postfix, "jim" + postfix}, nil)
        rb.Field(1).(*array.StringBuilder).AppendValues([]string{"beijing" + postfix, "shanghai" + postfix, "chengdu" + postfix}, nil)
        rb.Field(2).(*array.Int16Builder).AppendValues([]int16{1992 + int16(i), 1993 + int16(i), 1994 + int16(i)}, nil)
        rec := rb.NewRecord()
        records = append(records, rec)
    }

    // write to parquet
    f, err := os.Create("flat_record.parquet")
    if err != nil {
        panic(err)
    }

    props := parquet.NewWriterProperties()
    writer, err := pqarrow.NewFileWriter(schema, f, props,
        pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
    defer writer.Close()

    for _, rec := range records {
        if err := writer.Write(rec); err != nil {
            panic(err)
        }
        rec.Release()
    }
}

和调用WriteTable完成table到parquet文件的写入不同,这里我们创建了一个FileWriter,通过FileWriter将构建出的Record Batch逐个写入。运行上述代码生成flat_record.parquet文件并使用parquet_reader展示该文件的内容:

$go run flat_record_to_parquet.go
$parquet_reader flat_record.parquet
File name: flat_record.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 9
Number of RowGroups: 3
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: archer (BYTE_ARRAY/UTF8)
Column 1: location (BYTE_ARRAY/UTF8)
Column 2: year (INT32/INT_16)
--- Row Group: 0  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 48], Max: [116 111 110 121 48], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 48], Max: [115 104 97 110 103 104 97 105 48], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1992, Max: 1994, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony0             |beijing0          |1992              |
amy0              |shanghai0         |1993              |
jim0              |chengdu0          |1994              |

--- Row Group: 1  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 49], Max: [116 111 110 121 49], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 49], Max: [115 104 97 110 103 104 97 105 49], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1993, Max: 1995, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony1             |beijing1          |1993              |
amy1              |shanghai1         |1994              |
jim1              |chengdu1          |1995              |

--- Row Group: 2  ---
--- Total Bytes: 255  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 50], Max: [116 111 110 121 50], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 79
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 50], Max: [115 104 97 110 103 104 97 105 50], Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 99
Column 2
 Values: 3, Min: 1994, Max: 1996, Null Values: 0, Distinct Values: 0
 Compression: UNCOMPRESSED, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 77
--- Values ---
archer            |location          |year              |
tony2             |beijing2          |1994              |
amy2              |shanghai2         |1995              |
jim2              |chengdu2          |1996              |

我们看到parquet_reader分别输出了三个row group的元数据和列值,每个row group与我们写入的一个record对应。

那读取这样的parquet文件与ReadTable有何不同呢?我们继续往下看。

3.2 Record Batch <- Parquet

下面是用于读取

// flat_record_from_parquet.go
func main() {
    f, err := os.Open("flat_record.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    rdr, err := file.NewParquetReader(f)
    if err != nil {
        panic(err)
    }
    defer rdr.Close()

    arrRdr, err := pqarrow.NewFileReader(rdr,
        pqarrow.ArrowReadProperties{
            BatchSize: 3,
        }, memory.DefaultAllocator)
    if err != nil {
        panic(err)
    }

    s, _ := arrRdr.Schema()
    fmt.Println(*s)

    rr, err := arrRdr.GetRecordReader(context.Background(), nil, nil)
    if err != nil {
        panic(err)
    }

    for {
        rec, err := rr.Read()
        if err != nil && err != io.EOF {
            panic(err)
        }
        if err == io.EOF {
            break
        }
        fmt.Println(rec)
    }
}

我们看到相对于将parquet转换为table,将parquet转换为record略为复杂一些,这里的一个关键是在调用NewFileReader时传入的ArrowReadProperties中的BatchSize字段,要想正确读取出record,这个BatchSize需适当填写。这个BatchSize会告诉Reader 每个读取的Record Batch的长度,也就是row数量。这里传入的是3,即3个row为一个Recordd batch。

下面是运行上述程序的结果:

$go run flat_record_from_parquet.go
{[{archer 0x26ccc00 false {[PARQUET:field_id] [-1]}} {location 0x26ccc00 false {[PARQUET:field_id] [-1]}} {year 0x26ccc00 false {[PARQUET:field_id] [-1]}}] map[archer:[0] location:[1] year:[2]] {[] []} 0}
record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony0" "amy0" "jim0"]
  col[1][location]: ["beijing0" "shanghai0" "chengdu0"]
  col[2][year]: [1992 1993 1994]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony1" "amy1" "jim1"]
  col[1][location]: ["beijing1" "shanghai1" "chengdu1"]
  col[2][year]: [1993 1994 1995]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 3
  col[0][archer]: ["tony2" "amy2" "jim2"]
  col[1][location]: ["beijing2" "shanghai2" "chengdu2"]
  col[2][year]: [1994 1995 1996]

我们看到:每3行被作为一个record读取出来了。如果将BatchSize改为5,则输出如下:

$go run flat_record_from_parquet.go
{[{archer 0x26ccc00 false {[PARQUET:field_id] [-1]}} {location 0x26ccc00 false {[PARQUET:field_id] [-1]}} {year 0x26ccc00 false {[PARQUET:field_id] [-1]}}] map[archer:[0] location:[1] year:[2]] {[] []} 0}
record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 5
  col[0][archer]: ["tony0" "amy0" "jim0" "tony1" "amy1"]
  col[1][location]: ["beijing0" "shanghai0" "chengdu0" "beijing1" "shanghai1"]
  col[2][year]: [1992 1993 1994 1993 1994]

record:
  schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
  rows: 4
  col[0][archer]: ["jim1" "tony2" "amy2" "jim2"]
  col[1][location]: ["chengdu1" "beijing2" "shanghai2" "chengdu2"]
  col[2][year]: [1995 1994 1995 1996]

这次:前5行作为一个record,后4行作为另外一个record。

当然,我们也可以使用flat_table_from_parquet.go中的代码来读取flat_record.parquet(将读取文件名改为flat_record.parquet),只不过由于将parquet数据转换为了table,其输出内容将变为:

$go run flat_table_from_parquet.go
schema:
  fields: 3
    - archer: type=utf8
        metadata: ["PARQUET:field_id": "-1"]
    - location: type=utf8
          metadata: ["PARQUET:field_id": "-1"]
    - year: type=int16
      metadata: ["PARQUET:field_id": "-1"]
------
the count of table columns= 3
the count of table rows= 9
------
arrays in column(archer):
["tony0" "amy0" "jim0" "tony1" "amy1" "jim1" "tony2" "amy2" "jim2"]
------
arrays in column(location):
["beijing0" "shanghai0" "chengdu0" "beijing1" "shanghai1" "chengdu1" "beijing2" "shanghai2" "chengdu2"]
------
arrays in column(year):
[1992 1993 1994 1993 1994 1995 1994 1995 1996]
------

3.3 Record Batch -> Parquet(压缩)

Recod同样支持压缩写入Parquet,其原理与前面table压缩存储是一致的,都是通过设置WriterProperties来实现的:

// flat_record_to_parquet_compressed.go

func main() {
    ... ...
    f, err := os.Create("flat_record_compressed.parquet")
    if err != nil {
        panic(err)
    }
    defer f.Close()

    props := parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Zstd),
        parquet.WithCompressionFor("year", compress.Codecs.Brotli))
    writer, err := pqarrow.NewFileWriter(schema, f, props,
        pqarrow.DefaultWriterProps())
    if err != nil {
        panic(err)
    }
    defer writer.Close()

    for _, rec := range records {
        if err := writer.Write(rec); err != nil {
            panic(err)
        }
        rec.Release()
    }
}

不过这次针对arrow.string类型和arrow.int16类型的压缩效果非常“差”:

$parquet_reader flat_record_compressed.parquet
File name: flat_record_compressed.parquet
Version: v2.6
Created By: parquet-go version 13.0.0-SNAPSHOT
Num Rows: 9
Number of RowGroups: 3
Number of Real Columns: 3
Number of Columns: 3
Number of Selected Columns: 3
Column 0: archer (BYTE_ARRAY/UTF8)
Column 1: location (BYTE_ARRAY/UTF8)
Column 2: year (INT32/INT_16)
--- Row Group: 0  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 48], Max: [116 111 110 121 48], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 48], Max: [115 104 97 110 103 104 97 105 48], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1992, Max: 1994, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony0             |beijing0          |1992              |
amy0              |shanghai0         |1993              |
jim0              |chengdu0          |1994              |

--- Row Group: 1  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 49], Max: [116 111 110 121 49], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 49], Max: [115 104 97 110 103 104 97 105 49], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1993, Max: 1995, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony1             |beijing1          |1993              |
amy1              |shanghai1         |1994              |
jim1              |chengdu1          |1995              |

--- Row Group: 2  ---
--- Total Bytes: 315  ---
--- Rows: 3  ---
Column 0
 Values: 3, Min: [97 109 121 50], Max: [116 111 110 121 50], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 79, Compressed Size: 105
Column 1
 Values: 3, Min: [98 101 105 106 105 110 103 50], Max: [115 104 97 110 103 104 97 105 50], Null Values: 0, Distinct Values: 0
 Compression: ZSTD, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 99, Compressed Size: 125
Column 2
 Values: 3, Min: 1994, Max: 1996, Null Values: 0, Distinct Values: 0
 Compression: BROTLI, Encodings: RLE_DICTIONARY PLAIN RLE
 Uncompressed Size: 77, Compressed Size: 85
--- Values ---
archer            |location          |year              |
tony2             |beijing2          |1994              |
amy2              |shanghai2         |1995              |
jim2              |chengdu2          |1996              |

越压缩,parquet文件的size越大。当然这个问题不是我们这篇文章的重点,只是提醒大家选择适当的压缩算法十分重要

3.4 Record Batch <- Parquet(压缩)

和读取table转换后的压缩parquet文件一样,读取record转换后的压缩parquet一样无需特殊设置,使用flat_record_from_parquet.go即可(需要改一下读取的文件名),这里就不赘述了。

4. 小结

本文旨在介绍使用Go进行Arrow和Parquet文件相互转换的基本方法,我们以table和record两种高级数据结构为例,分别介绍了读写parquet文件以及压缩parquet文件的方法。

当然本文中的例子都是“平坦(flat)”的简单例子,parquet文件还支持更复杂的嵌套数据,我们会在后续的深入讲解parquet格式的文章中提及。

本文示例代码可以在这里下载。

5. 参考资料

  • Parquet File Format – https://parquet.apache.org/docs/file-format/
  • 《Dremel: Interactive Analysis of Web-Scale Datasets》 – https://storage.googleapis.com/pub-tools-public-publication-data/pdf/36632.pdf
  • Announcing Parquet 1.0: Columnar Storage for Hadoop – https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop
  • Dremel made simple with Parquet – https://blog.twitter.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet
  • parquet项目首页 – http://parquet.apache.org/
  • Apache Parquet介绍 by influxdata – https://www.influxdata.com/glossary/apache-parquet/
  • Intro to InfluxDB IOx – https://www.influxdata.com/blog/intro-influxdb-iox/
  • Apache Arrow介绍 by influxdb – https://www.influxdata.com/glossary/apache-arrow/
  • 开源时序数据库解析 – InfluxDB IOx – https://zhuanlan.zhihu.com/p/534035337
  • Arrow and Parquet Part 1: Primitive Types and Nullability – https://arrow.apache.org/blog/2022/10/05/arrow-parquet-encoding-part-1/
  • Arrow and Parquet Part 2: Nested and Hierarchical Data using Structs and Lists – https://arrow.apache.org/blog/2022/10/08/arrow-parquet-encoding-part-2/
  • Arrow and Parquet Part 3: Arbitrary Nesting with Lists of Structs and Structs of Lists – https://arrow.apache.org/blog/2022/10/17/arrow-parquet-encoding-part-3/
  • Cost Efficiency @ Scale in Big Data File Format – https://www.uber.com/blog/cost-efficiency-big-data/

“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

Go语言开发者的Apache Arrow使用指南:扩展compute包

本文永久链接 – https://tonybai.com/2023/07/22/a-guide-of-using-apache-arrow-for-gopher-part5

在本系列文章的第4篇《Go语言开发者的Apache Arrow使用指南:数据操作》中我们遇到了大麻烦:Go的Arrow实现居然不支持像max、min、sum这样的简单聚合计算函数:(,分组聚合(grouped aggregation)就更是“遥不可期”。要想对从CSV读取的数据进行聚合操作和分析,我们只能“自己动手,丰衣足食” – 扩展Arrow Go实现中的compute包了

不过,Arrow的Go实现还是蛮复杂的,如果对其结构没有一个初步的认知,很难实现这类扩展。在这篇文章中,我们就来了解一下compute包的结构,并尝试为compute包添加几个简单的、仅能处理单一类型的聚合函数,先来完成一些从0到1的工作。

为了深入了解Go Arrow实现,我又翻阅了一下Arrow官方的文档,显然Arrow C++的文档是最丰富的。我快读了一下C++的Arrow文档,对Arrow的结构有了更深刻的认知,基于这些资料,我们先来做一下Arrow结构的回顾。

0. 回顾Arrow的各个layer

Arrow的C++文档使用layer来介绍各种Arrow的概念,我们挑几个重要的看一下:

  • 物理层(The physical layer)

物理层针对的是内存的分配管理,包括内存分配的方法(堆分配器、内存文件映射、静态内存区)等。这一层的一个最重要的概念就是我们之前在数据类型一文中提到的Buffer抽象,它代表了内存中的一块连续的数据存储区域

  • 一维表示层(The one-dimensional layer)

除了物理层,后续的层都是逻辑层。一维表示层是一个逻辑表示层,它定义了Arrow的最基本数据类型:array数据类型决定了物理层内存数据的解释方法,逻辑数据类型array在物理层投影为一个和多个内存buffer

我们在“高级数据结构”提到的chunked array也在这一层,chunked array由多个同构类型的array组成,Arrow将其理解为一个同构的(相同类型的)、逻辑上值连续的、更大的array,是array基础类型的一个更泛化的表示。

  • 二维表示层(The two-dimensional layer)

“高级数据结构”一文中除chunked array之外的概念,都在这一层,包括schema、table、record batch。

schema是用于描述一维数据(一列数据,即一个逻辑array)的元数据,包括列名、类型与其他元信息。

Table是schema+与schema元信息对应的多个chunked array,它是Arrow中数据集抽象能力最强的逻辑结构。

Record Batch则是schema+与schema元信息对应的多个array。还记得“高级数据结构”一文中的那副直观给出table与record batch差异的图么:

  • 计算层(The compute layer)

计算层一个重要的抽象是Datum,这是一个灵活的抽象,用于统一表示参与计算的各类输入参数和返回值。

计算层真正执行计算的函数被统一放在kernel这个“层次”中,这个层次的函数对Datum类型的输入参数进行计算并返回Datam类型的结果或以Datum类型的输出参数承载计算结果。

  • IPC层(The Inter-Process Communication (IPC) layer)

这是我们尚未接触过的一层,通过这一层,复合Arrow columnar format的数据可以在进程间(同一主机或不同主机)交互,并且这种交换可以保证尽可能少的内存copy。

  • 文件格式层(The file formats layer)

这一层负责读写文件,在之前的“数据操作”一篇中,我们接触过将CSV文件中的数据读到内存中并组织为Arrow列式存储格式,在后续篇章中,我们还将陆续介绍Arrow与CSV(写入)、Parquet文件的数据交互。

C++有关Arrow的介绍中还有设备层(the devices layer)、文件系统层(the file system layer)等,后续可能不会涉及,这里就不说了。

通过上述回顾,再对照本系列第一篇文章“数据类型”的内容,你对Arrow的理解是不是更深刻一点点了呢:)。

接下来,我们重点看看计算层(the compute layer)。

1. 计算层(the compute layer)的结构

Go语言的计算层在compute目录下。Go语言借鉴了C++计算层的设计,将计算层分为compute和kernel,这个从代码布局上也可以明显看出来:

$tree -F -L 2 compute|grep -v go
compute           --- compute层
├── exprs/
├── internal/
│   ├── exec/
│   └── kernels/  --- compute的kernel层

compute包采用了registry模式,初始化时将底层的kernel function包装成上层的Function并注册到registry中。用户调用某个function时,该function会在registry中查找对应的注册函数并调用。

下面我们通过Uniq这个array-wise函数作为例子来探索一下kernel function的注册与调用过程。下面是“数据操作”一文中的示例,这里再次借用一下:

// arrow/manipulation/unary_arraywise_function.go

func main() {
    data := []int32{5, 10, 0, 25, 2, 10, 2, 25}
    bldr := array.NewInt32Builder(memory.DefaultAllocator)
    defer bldr.Release()
    bldr.AppendValues(data, nil)
    arr := bldr.NewArray()
    defer arr.Release()

    dat, err := compute.Unique(context.Background(), compute.NewDatum(arr))
    if err != nil {
        fmt.Println(err)
        return
    }

    arr1, ok := dat.(*compute.ArrayDatum)
    if !ok {
        fmt.Println("type assert fail")
        return
    }
    fmt.Println(arr1.MakeArray()) // [5 10 0 25 2]
}

下面是Unique函数的注册和调用过程示意图:

很显然,整个过程包括两个明显的阶段:

  • 包装并向Registry注册kernel函数(AddFunction)
  • 在Registry中查找函数并调用(GetFunction)

当我们在用户层调用compute.Unique函数时,一个统一的CallFunction会被调用,其第二个参数”uniq”表明我们要调用registry中的名为”uniq”的包装函数。在这个过程中GetFunctionRegistry被调用以获取registry实例,在这个过程中,如果registry实例尚没有创建,GetFunctionRegistry会在sync.Once的保护下创建registry并进行初始注册工作(RegisterXXX)。”uniq”对应的包装函数是在RegisterVectorHash中被注册到registry中的。

RegisterVectorHash会通过kernel层提供的GetVectorHashKernels获取kernel层的”uniq”实现,并将其通过NewVectorFunction和AddKernel包装为uniqFn这一用户层的Function,该uniqFn Function最终会被AddFunction加入到registry中。

而CallFunction(ctx, “uniq”)也会从registry中将uniqFn查找出来并执行其Execute方法,该Execute方法实际上执行的是kernel层的”uniq”实现。

我们看到:通过示意图展示的Unique函数的注册与调用过程还是相对清晰的(但如果要阅读对应的代码,还是比较繁琐的)。

到这里我们也大致了解了compute包的结构以及与kernel层的关系,接下来我们就来尝试给compute包添加一些scalar aggregate函数,所谓scalar aggregate函数就是输入是array,输出是一个scalar值的函数,比如:max、min、sum等。

3. 添加Max、Min、Sum、Avg等Scalar Aggregate函数

在上一篇“数据操作”时提过,聚合函数分为Scalar聚合和grouped聚合,显然Scalar聚合函数要简单一些,这里我们就来向compute层添加scalar aggregate函数,以Max为例,我们希望用户层这样使用Max聚合函数:

// max_aggregate_function.go
func main() {
    data := []int64{5, 10, 0, 25, 2, 35, 7, 15}
    bldr := array.NewInt64Builder(memory.DefaultAllocator)
    defer bldr.Release()
    bldr.AppendValues(data, nil)
    arr := bldr.NewArray()
    defer arr.Release()

    dat, err := compute.Max(context.Background(), compute.NewDatum(arr))
    if err != nil {
        fmt.Println(err)
        return
    }

    ad, ok := dat.(*compute.ArrayDatum)
    if !ok {
        fmt.Println("type assert fail")
        return
    }
    arr1 := ad.MakeArray()
    fmt.Println(arr1) // [35]
}

注:这里有一个问题,那就是Max返回的Datum是一个ArrayDatum,而不是期望的ScalarDatum。

通过上面的compute layer的结构,我们知道,如果要添加Max、Min、Sum、Avg等Scalar Aggregate函数,我们需要在kernel层和compute层协作实现。下面是实现的具体步骤。

3.1 向kernel层添加scalar聚合函数实现

compute层要支持scalar聚合,需要kernel层线支持scalar聚合,这里我们先向compute/internal/kernels目录添加一个scalar_agg.go,用于在kernel层实现scalar聚合,以Max为例:

// compute/internal/kernels/scalar_agg.go

package kernels

import (
    "fmt"

    "github.com/apache/arrow/go/v13/arrow"
    "github.com/apache/arrow/go/v13/arrow/compute/internal/exec"
    "github.com/apache/arrow/go/v13/arrow/scalar"
)

func ScalarAggKernels(op ScalarAggOperator) (aggs []exec.ScalarKernel) {
    switch op {
    case AggMax:
        maxAggs := maxAggKernels()
        aggs = append(aggs, maxAggs...)
    case AggMin:
        minAggs := minAggKernels()
        aggs = append(aggs, minAggs...)
    case AggAvg:
        avgAggs := avgAggKernels()
        aggs = append(aggs, avgAggs...)
    case AggSum:
        sumAggs := sumAggKernels()
        aggs = append(aggs, sumAggs...)
    }

    return
}

func aggMax(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
    var max int64

    for _, v := range batch.Values {
        if !v.IsArray() {
            return fmt.Errorf("%w: input datum is not array", arrow.ErrInvalid)
        }

        if v.Array.Type != arrow.PrimitiveTypes.Int64 {
            return fmt.Errorf("%w: array type is not int64", arrow.ErrInvalid)
        }

        // for int64 array:
        //   first buffer is meta buffer
        //   second buffer is what we want
        int64s := exec.GetSpanValues[int64](&v.Array, 1)
        for _, v64 := range int64s {
            if v64 > max {
                max = v64
            }
        }
    }

    out.FillFromScalar(scalar.NewInt64Scalar(max))
    return nil
}

func maxAggKernels() (aggs []exec.ScalarKernel) {
    outType := exec.NewOutputType(arrow.PrimitiveTypes.Int64)
    in := exec.NewExactInput(arrow.PrimitiveTypes.Int64)
    aggs = append(aggs, exec.NewScalarKernel([]exec.InputType{in}, outType,
        aggMax, nil))

    return
}
... ...

上面的ScalarAggKernels函数就像上图中的GetVectorHashKernels一样,为compute层提供kernel层scalar agg函数的获取“渠道”。aggMax函数是实现聚合逻辑的那个函数,它针对输入的array进行操作,计算array中所有元素中的最大值,并将这个值包装成Datum作为out参数输出。

在compute/internal/kernels/types.go中,我们定义了如下枚举常量,用于compute层传入要选择的scalar聚合函数。

// compute/internal/kernels/types.go

//go:generate stringer -type=ScalarAggOperator -linecomment

type ScalarAggOperator int8

const (
    AggMax ScalarAggOperator = iota // max
    AggMin                          // min
    AggAvg                          // avg
    AggSum                          // sum
)

3.2 在compute层提供对kernel层聚合函数的包装

在compute层,我们也提供一个scalar_agg.go文件,用于对kernel层的聚合函数进行包装:

// compute/scalar_agg.go

package compute

import (
    "context"

    "github.com/apache/arrow/go/v13/arrow/compute/internal/kernels"
)

type aggFunction struct {
    ScalarFunction
}

func Max(ctx context.Context, values Datum) (Datum, error) {
    return CallFunction(ctx, "max", nil, values)
}
func Min(ctx context.Context, values Datum) (Datum, error) {
    return CallFunction(ctx, "min", nil, values)
}
func Avg(ctx context.Context, values Datum) (Datum, error) {
    return CallFunction(ctx, "avg", nil, values)
}
func Sum(ctx context.Context, values Datum) (Datum, error) {
    return CallFunction(ctx, "sum", nil, values)
}

func RegisterScalarAggs(reg FunctionRegistry) {
    maxFn := &aggFunction{*NewScalarFunction("max", Unary(), EmptyFuncDoc)}
    for _, k := range kernels.ScalarAggKernels(kernels.AggMax) {
        if err := maxFn.AddKernel(k); err != nil {
            panic(err)
        }
    }
    reg.AddFunction(maxFn, false)

    minFn := &aggFunction{*NewScalarFunction("min", Unary(), EmptyFuncDoc)}
    for _, k := range kernels.ScalarAggKernels(kernels.AggMin) {
        if err := minFn.AddKernel(k); err != nil {
            panic(err)
        }
    }
    reg.AddFunction(minFn, false)

    avgFn := &aggFunction{*NewScalarFunction("avg", Unary(), EmptyFuncDoc)}
    for _, k := range kernels.ScalarAggKernels(kernels.AggAvg) {
        if err := avgFn.AddKernel(k); err != nil {
            panic(err)
        }
    }
    reg.AddFunction(avgFn, false)

    sumFn := &aggFunction{*NewScalarFunction("sum", Unary(), EmptyFuncDoc)}
    for _, k := range kernels.ScalarAggKernels(kernels.AggSum) {
        if err := sumFn.AddKernel(k); err != nil {
            panic(err)
        }
    }
    reg.AddFunction(sumFn, false)
}

我们看到在这个源文件中,我们提供了供最终用户调用的Max等函数,这些函数是对kernel层scalar聚合函数的包装,通过CallFunction在registry中找到注册的kernel函数并执行它。

RegisterScalarAggs是用于向registry注册scalar聚合函数的函数。

3.3 在compute层将包装后的聚合函数注册到Registry中

我们修改一下compute/registry.go,在GetFunctionRegistry函数中增加对RegisterScalarAggs的调用,以实现对scalar聚合函数的注册:

// compute/registry.go

func GetFunctionRegistry() FunctionRegistry {
    once.Do(func() {
        registry = NewRegistry()
        RegisterScalarCast(registry)
        RegisterVectorSelection(registry)
        RegisterScalarBoolean(registry)
        RegisterScalarArithmetic(registry)
        RegisterScalarComparisons(registry)
        RegisterVectorHash(registry)
        RegisterVectorRunEndFuncs(registry)
        RegisterScalarAggs(registry)
    })
    return registry
}

3.4 运行示例

最初运行arrow/compute-extension/max_aggregate_function.go示例的结果并非我们预期,而是一个全0的数组:

$go run max_aggregate_function.go
[0 0 0 0 0 0 0 0]

经过print调试大法后,我发现compute/executor.go中的executeSpans的实现似乎有一个问题,我在arrow项目提了一个issue,并对executor.go做了如下修改:

diff --git a/go/arrow/compute/executor.go b/go/arrow/compute/executor.go
index d3f1a1fd4..e9bda7137 100644
--- a/go/arrow/compute/executor.go
+++ b/go/arrow/compute/executor.go
@@ -604,7 +604,7 @@ func (s *scalarExecutor) executeSpans(data chan<- Datum) (err error) {
                        return
                }

-               return s.emitResult(prealloc, data)
+               return s.emitResult(&output, data)
        }

        // fully preallocating, but not contiguously
(END)

修改后,再运行arrow/compute-extension/max_aggregate_function.go示例就得到了正确的结果:

$go run max_aggregate_function.go
[35]

3.5 To Be Done

到这里,我们从0到1的为arrow go实现的compute层添加了int64类型的scalar聚合函数的支持(以max为例),但这仅仅是验证了思路的可行性,上述对compute的修改可能是不合理的。此外,上述的改动不是production ready的,存在一些问题,比如:

  • Max返回的是array datam,而不是我们想要的scalar Datam;
  • 仅支持int64,不支持其他类型的max聚合,比如float64、string等;
  • 性能没有优化;
  • 对chunked array类型的scalar datam尚未给出验证示例。
  • … …

4. 小结

在本文中我们基于C++的资料,回顾了Arrow的一些基础抽象概念,从而对Arrow有了更为深刻的认知。之后,也是我们的重点,就是给出了compute层的结构以及基于该结构为compute层增加scalar聚合函数的一种思路和示例代码。

不过这种思路只是为了理解arrow的一种试验性方法,存在其不合理的地方,随着arrow演进,这种方法也许将不适用。同时,后续arrow官方可能会为go增加aggregate function的支持,那时请大家以官方实现为准。

C++版本Arrow实现完全支持各种聚合函数,考虑到Go arrow的实现参考了C++版本的思路,如果要为go arrow正式增加聚合函数支持,阅读c++源码并考虑迁移到Go才是正道。

本文示例代码可以在这里下载,同时增加了scalar function的arrow的fork版本可以在我的github项目arrow-extend-compute1下找到。

5. 参考资料

  • 计算层 – https://arrow.apache.org/docs/cpp/compute.html
  • 计算层教程 – https://arrow.apache.org/docs/cpp/tutorials/compute_tutorial.html
  • Arrow C++参考 – https://arrow.apache.org/docs/cpp/overview.html
  • Go unique kernel函数PR – https://github.com/apache/arrow/pull/34172

“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻)归档仓库 – https://github.com/bigwhite/gopherdaily

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats