| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package testutils |
| |
| import ( |
| "encoding/binary" |
| "io" |
| "reflect" |
| |
| "github.com/apache/arrow/go/v9/arrow/memory" |
| "github.com/apache/arrow/go/v9/internal/utils" |
| "github.com/apache/arrow/go/v9/parquet" |
| "github.com/apache/arrow/go/v9/parquet/compress" |
| "github.com/apache/arrow/go/v9/parquet/file" |
| "github.com/apache/arrow/go/v9/parquet/internal/encoding" |
| "github.com/apache/arrow/go/v9/parquet/schema" |
| "github.com/stretchr/testify/mock" |
| ) |
| |
| type DataPageBuilder struct { |
| sink io.Writer |
| version parquet.DataPageVersion |
| |
| nvals int |
| encoding parquet.Encoding |
| defLvlEncoding parquet.Encoding |
| repLvlEncoding parquet.Encoding |
| defLvlBytesLen int |
| repLvlBytesLen int |
| hasDefLvls bool |
| hasRepLvls bool |
| hasValues bool |
| } |
| |
| var mem = memory.NewGoAllocator() |
| |
| func (d *DataPageBuilder) appendLevels(lvls []int16, maxLvl int16, e parquet.Encoding) int { |
| if e != parquet.Encodings.RLE { |
| panic("parquet: only rle encoding currently implemented") |
| } |
| |
| buf := encoding.NewBufferWriter(encoding.LevelEncodingMaxBufferSize(e, maxLvl, len(lvls)), memory.DefaultAllocator) |
| var enc encoding.LevelEncoder |
| enc.Init(e, maxLvl, buf) |
| enc.Encode(lvls) |
| |
| rleBytes := enc.Len() |
| if d.version == parquet.DataPageV1 { |
| if err := binary.Write(d.sink, binary.LittleEndian, int32(rleBytes)); err != nil { |
| panic(err) |
| } |
| } |
| |
| if _, err := d.sink.Write(buf.Bytes()[:rleBytes]); err != nil { |
| panic(err) |
| } |
| return rleBytes |
| } |
| |
| func (d *DataPageBuilder) AppendDefLevels(lvls []int16, maxLvl int16) { |
| d.defLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE) |
| |
| d.nvals = utils.MaxInt(len(lvls), d.nvals) |
| d.defLvlEncoding = parquet.Encodings.RLE |
| d.hasDefLvls = true |
| } |
| |
| func (d *DataPageBuilder) AppendRepLevels(lvls []int16, maxLvl int16) { |
| d.repLvlBytesLen = d.appendLevels(lvls, maxLvl, parquet.Encodings.RLE) |
| |
| d.nvals = utils.MaxInt(len(lvls), d.nvals) |
| d.repLvlEncoding = parquet.Encodings.RLE |
| d.hasRepLvls = true |
| } |
| |
| func (d *DataPageBuilder) AppendValues(desc *schema.Column, values interface{}, e parquet.Encoding) { |
| enc := encoding.NewEncoder(desc.PhysicalType(), e, false, desc, mem) |
| var sz int |
| switch v := values.(type) { |
| case []int32: |
| enc.(encoding.Int32Encoder).Put(v) |
| sz = len(v) |
| case []int64: |
| enc.(encoding.Int64Encoder).Put(v) |
| sz = len(v) |
| case []parquet.Int96: |
| enc.(encoding.Int96Encoder).Put(v) |
| sz = len(v) |
| case []float32: |
| enc.(encoding.Float32Encoder).Put(v) |
| sz = len(v) |
| case []float64: |
| enc.(encoding.Float64Encoder).Put(v) |
| sz = len(v) |
| case []parquet.ByteArray: |
| enc.(encoding.ByteArrayEncoder).Put(v) |
| sz = len(v) |
| } |
| buf, _ := enc.FlushValues() |
| _, err := d.sink.Write(buf.Bytes()) |
| if err != nil { |
| panic(err) |
| } |
| |
| d.nvals = utils.MaxInt(sz, d.nvals) |
| d.encoding = e |
| d.hasValues = true |
| } |
| |
| type DictionaryPageBuilder struct { |
| traits encoding.DictEncoder |
| numDictValues int32 |
| hasValues bool |
| } |
| |
| func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder { |
| return &DictionaryPageBuilder{ |
| encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain, true, d, mem).(encoding.DictEncoder), |
| 0, false} |
| } |
| |
| func (d *DictionaryPageBuilder) AppendValues(values interface{}) encoding.Buffer { |
| switch v := values.(type) { |
| case []int32: |
| d.traits.(encoding.Int32Encoder).Put(v) |
| case []int64: |
| d.traits.(encoding.Int64Encoder).Put(v) |
| case []parquet.Int96: |
| d.traits.(encoding.Int96Encoder).Put(v) |
| case []float32: |
| d.traits.(encoding.Float32Encoder).Put(v) |
| case []float64: |
| d.traits.(encoding.Float64Encoder).Put(v) |
| case []parquet.ByteArray: |
| d.traits.(encoding.ByteArrayEncoder).Put(v) |
| } |
| |
| d.numDictValues = int32(d.traits.NumEntries()) |
| d.hasValues = true |
| buf, _ := d.traits.FlushValues() |
| return buf |
| } |
| |
| func (d *DictionaryPageBuilder) WriteDict() *memory.Buffer { |
| buf := memory.NewBufferBytes(make([]byte, d.traits.DictEncodedSize())) |
| d.traits.WriteDict(buf.Bytes()) |
| return buf |
| } |
| |
| func (d *DictionaryPageBuilder) NumValues() int32 { |
| return d.numDictValues |
| } |
| |
| func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column, values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer, defLvls, repLvls []int16, maxDef, maxRep int16) file.Page { |
| num := 0 |
| |
| stream := encoding.NewBufferWriter(1024, mem) |
| builder := DataPageBuilder{sink: stream, version: dataPageVersion} |
| |
| if len(repLvls) > 0 { |
| builder.AppendRepLevels(repLvls, maxRep) |
| } |
| if len(defLvls) > 0 { |
| builder.AppendDefLevels(defLvls, maxDef) |
| } |
| |
| if e == parquet.Encodings.Plain { |
| builder.AppendValues(d, values, e) |
| num = builder.nvals |
| } else { |
| stream.Write(indexBuffer.Bytes()) |
| num = utils.MaxInt(builder.nvals, nvals) |
| } |
| |
| buf := stream.Finish() |
| if dataPageVersion == parquet.DataPageV1 { |
| return file.NewDataPageV1(buf, int32(num), e, builder.defLvlEncoding, builder.repLvlEncoding, int32(buf.Len())) |
| } |
| return file.NewDataPageV2(buf, int32(num), 0, int32(num), e, int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int32(buf.Len()), false) |
| } |
| |
| func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) { |
| bldr := NewDictionaryPageBuilder(d) |
| npages := len(valuesPerPage) |
| |
| ref := reflect.ValueOf(values) |
| valStart := 0 |
| |
| rleIndices := make([]encoding.Buffer, 0, npages) |
| for _, nvals := range valuesPerPage { |
| rleIndices = append(rleIndices, bldr.AppendValues(ref.Slice(valStart, valStart+nvals).Interface())) |
| valStart += nvals |
| } |
| |
| buffer := bldr.WriteDict() |
| return file.NewDictionaryPage(buffer, bldr.NumValues(), parquet.Encodings.Plain), rleIndices |
| } |
| |
| type MockPageReader struct { |
| mock.Mock |
| |
| curpage int |
| } |
| |
| func (m *MockPageReader) Err() error { |
| return m.Called().Error(0) |
| } |
| |
| func (m *MockPageReader) Reset(parquet.BufferedReader, int64, compress.Compression, *file.CryptoContext) { |
| } |
| |
| func (m *MockPageReader) SetMaxPageHeaderSize(int) {} |
| |
| func (m *MockPageReader) Page() file.Page { |
| return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1] |
| } |
| |
| func (m *MockPageReader) Next() bool { |
| pageList := m.TestData().Get("pages").Data().([]file.Page) |
| m.curpage++ |
| return len(pageList) >= m.curpage |
| } |
| |
| func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16, |
| maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page { |
| |
| var ( |
| npages = len(valuesPerPage) |
| defLvlStart = 0 |
| defLvlEnd = 0 |
| repLvlStart = 0 |
| repLvlEnd = 0 |
| valueStart = 0 |
| ) |
| |
| pageList := make([]file.Page, 0, npages) |
| for i := 0; i < npages; i++ { |
| if maxDef > 0 { |
| defLvlStart = i * lvlsPerPage |
| defLvlEnd = (i + 1) * lvlsPerPage |
| } |
| if maxRep > 0 { |
| repLvlStart = i * lvlsPerPage |
| repLvlEnd = (i + 1) * lvlsPerPage |
| } |
| |
| page := MakeDataPage(version, d, |
| values.Slice(valueStart, valueStart+valuesPerPage[i]).Interface(), |
| valuesPerPage[i], enc, nil, defLevels[defLvlStart:defLvlEnd], |
| repLevels[repLvlStart:repLvlEnd], maxDef, maxRep) |
| valueStart += valuesPerPage[i] |
| pageList = append(pageList, page) |
| } |
| return pageList |
| } |
| |
| func PaginateDict(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16, maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page { |
| var ( |
| npages = len(valuesPerPage) |
| pages = make([]file.Page, 0, npages) |
| defStart = 0 |
| defEnd = 0 |
| repStart = 0 |
| repEnd = 0 |
| ) |
| |
| dictPage, rleIndices := MakeDictPage(d, values.Interface(), valuesPerPage, enc) |
| pages = append(pages, dictPage) |
| for i := 0; i < npages; i++ { |
| if maxDef > 0 { |
| defStart = i * lvlsPerPage |
| defEnd = (i + 1) * lvlsPerPage |
| } |
| if maxRep > 0 { |
| repStart = i * lvlsPerPage |
| repEnd = (i + 1) * lvlsPerPage |
| } |
| page := MakeDataPage(version, d, nil, valuesPerPage[i], enc, rleIndices[i], |
| defLevels[defStart:defEnd], repLevels[repStart:repEnd], maxDef, maxRep) |
| pages = append(pages, page) |
| } |
| return pages |
| } |