blob: 92e091323029eb87bdb1a42dd3a7278dc55d15c2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"github.com/apache/arrow/go/v6/parquet"
)
{{range .In}}
{{if ne .Name "Boolean"}}
func (r *RleDecoder) GetBatchWithDictSpaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDict{{.Name}}(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDict{{.Name}}(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspaced{{.Name}}(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspaced{{.Name}}(dc DictionaryConverter, vals []{{.name}}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiterals{{.Name}}(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiterals{{.Name}}(dc DictionaryConverter, vals []{{.name}}, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDict{{.Name}}(dc DictionaryConverter, vals []{{.name}}) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
{{end}}
{{end}}