| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package file_test |
| |
| import ( |
| "bytes" |
| "fmt" |
| "reflect" |
| "testing" |
| |
| "github.com/apache/arrow/go/v14/arrow/memory" |
| "github.com/apache/arrow/go/v14/parquet" |
| "github.com/apache/arrow/go/v14/parquet/compress" |
| "github.com/apache/arrow/go/v14/parquet/file" |
| "github.com/apache/arrow/go/v14/parquet/internal/encoding" |
| "github.com/apache/arrow/go/v14/parquet/internal/testutils" |
| "github.com/apache/arrow/go/v14/parquet/schema" |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| "github.com/stretchr/testify/suite" |
| ) |
| |
| type SerializeTestSuite struct { |
| testutils.PrimitiveTypedTest |
| suite.Suite |
| |
| numCols int |
| numRowGroups int |
| rowsPerRG int |
| rowsPerBatch int |
| } |
| |
| func (t *SerializeTestSuite) SetupTest() { |
| t.numCols = 4 |
| t.numRowGroups = 4 |
| t.rowsPerRG = 50 |
| t.rowsPerBatch = 10 |
| t.SetupSchema(parquet.Repetitions.Optional, t.numCols) |
| } |
| |
| func (t *SerializeTestSuite) fileSerializeTest(codec compress.Compression, expected compress.Compression) { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| opts := make([]parquet.WriterProperty, 0) |
| for i := 0; i < t.numCols; i++ { |
| opts = append(opts, parquet.WithCompressionFor(t.Schema.Column(i).Name(), codec)) |
| } |
| |
| props := parquet.NewWriterProperties(opts...) |
| |
| writer := file.NewParquetWriter(sink, t.Schema.Root(), file.WithWriterProps(props)) |
| t.GenerateData(int64(t.rowsPerRG)) |
| for rg := 0; rg < t.numRowGroups/2; rg++ { |
| rgw := writer.AppendRowGroup() |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := rgw.NextColumn() |
| t.WriteBatchValues(cw, t.DefLevels, nil) |
| cw.Close() |
| // ensure column() api which is specific to bufferedrowgroups cannot be called |
| t.Panics(func() { rgw.(file.BufferedRowGroupWriter).Column(col) }) |
| } |
| rgw.Close() |
| } |
| |
| // write half buffered row groups |
| for rg := 0; rg < t.numRowGroups/2; rg++ { |
| rgw := writer.AppendBufferedRowGroup() |
| for batch := 0; batch < (t.rowsPerRG / t.rowsPerBatch); batch++ { |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := rgw.Column(col) |
| offset := batch * t.rowsPerBatch |
| t.WriteBatchSubset(t.rowsPerBatch, offset, cw, t.DefLevels[offset:t.rowsPerBatch+offset], nil) |
| // Ensure NextColumn api which is specific to RowGroup cannot be called |
| t.Panics(func() { rgw.(file.SerialRowGroupWriter).NextColumn() }) |
| } |
| } |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := rgw.Column(col) |
| cw.Close() |
| } |
| rgw.Close() |
| } |
| writer.Close() |
| |
| nrows := t.numRowGroups * t.rowsPerRG |
| reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) |
| t.NoError(err) |
| t.Equal(t.numCols, reader.MetaData().Schema.NumColumns()) |
| t.Equal(t.numRowGroups, reader.NumRowGroups()) |
| t.EqualValues(nrows, reader.NumRows()) |
| |
| for rg := 0; rg < t.numRowGroups; rg++ { |
| rgr := reader.RowGroup(rg) |
| t.Equal(t.numCols, rgr.NumColumns()) |
| t.EqualValues(t.rowsPerRG, rgr.NumRows()) |
| chunk, _ := rgr.MetaData().ColumnChunk(0) |
| t.Equal(expected, chunk.Compression()) |
| |
| valuesRead := int64(0) |
| |
| for i := 0; i < t.numCols; i++ { |
| chunk, _ := rgr.MetaData().ColumnChunk(i) |
| t.False(chunk.HasIndexPage()) |
| t.DefLevelsOut = make([]int16, t.rowsPerRG) |
| t.RepLevelsOut = make([]int16, t.rowsPerRG) |
| colReader, err := rgr.Column(i) |
| t.NoError(err) |
| t.SetupValuesOut(int64(t.rowsPerRG)) |
| valuesRead = t.ReadBatch(colReader, int64(t.rowsPerRG), 0, t.DefLevelsOut, t.RepLevelsOut) |
| t.EqualValues(t.rowsPerRG, valuesRead) |
| t.Equal(t.Values, t.ValuesOut) |
| t.Equal(t.DefLevels, t.DefLevelsOut) |
| } |
| } |
| } |
| |
| func (t *SerializeTestSuite) unequalNumRows(maxRows int64, rowsPerCol []int64) { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| props := parquet.NewWriterProperties() |
| writer := file.NewParquetWriter(sink, t.Schema.Root(), file.WithWriterProps(props)) |
| defer writer.Close() |
| |
| rgw := writer.AppendRowGroup() |
| t.GenerateData(maxRows) |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := rgw.NextColumn() |
| t.WriteBatchSubset(int(rowsPerCol[col]), 0, cw, t.DefLevels[:rowsPerCol[col]], nil) |
| cw.Close() |
| } |
| err := rgw.Close() |
| t.Error(err) |
| t.ErrorContains(err, "row mismatch for unbuffered row group") |
| } |
| |
| func (t *SerializeTestSuite) unequalNumRowsBuffered(maxRows int64, rowsPerCol []int64) { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, t.Schema.Root()) |
| defer writer.Close() |
| |
| rgw := writer.AppendBufferedRowGroup() |
| t.GenerateData(maxRows) |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := rgw.Column(col) |
| t.WriteBatchSubset(int(rowsPerCol[col]), 0, cw, t.DefLevels[:rowsPerCol[col]], nil) |
| cw.Close() |
| } |
| err := rgw.Close() |
| t.Error(err) |
| t.ErrorContains(err, "row mismatch for buffered row group") |
| } |
| |
| func (t *SerializeTestSuite) TestZeroRows() { |
| t.NotPanics(func() { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, t.Schema.Root()) |
| defer writer.Close() |
| |
| srgw := writer.AppendRowGroup() |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := srgw.NextColumn() |
| cw.Close() |
| } |
| srgw.Close() |
| |
| brgw := writer.AppendBufferedRowGroup() |
| for col := 0; col < t.numCols; col++ { |
| cw, _ := brgw.Column(col) |
| cw.Close() |
| } |
| brgw.Close() |
| }) |
| } |
| |
| func (t *SerializeTestSuite) TestTooManyColumns() { |
| t.SetupSchema(parquet.Repetitions.Optional, 1) |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, t.Schema.Root()) |
| rgw := writer.AppendRowGroup() |
| |
| rgw.NextColumn() // first column |
| t.Panics(func() { rgw.NextColumn() }) // only one column! |
| } |
| |
| func (t *SerializeTestSuite) TestRepeatedTooFewRows() { |
| // optional and repeated, so definition and repetition levels |
| t.SetupSchema(parquet.Repetitions.Repeated, 1) |
| const nrows = 100 |
| t.GenerateData(nrows) |
| |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, t.Schema.Root()) |
| |
| rgw := writer.AppendRowGroup() |
| t.RepLevels = make([]int16, nrows) |
| for idx := range t.RepLevels { |
| t.RepLevels[idx] = 0 |
| } |
| |
| cw, _ := rgw.NextColumn() |
| t.WriteBatchValues(cw, t.DefLevels, t.RepLevels) |
| cw.Close() |
| |
| t.RepLevels[3] = 1 // this makes it so that values 2 and 3 are a single row |
| // as a result there's one too few rows in the result |
| |
| t.Panics(func() { |
| cw, _ = rgw.NextColumn() |
| t.WriteBatchValues(cw, t.DefLevels, t.RepLevels) |
| cw.Close() |
| }) |
| } |
| |
| func (t *SerializeTestSuite) TestTooFewRows() { |
| rowsPerCol := []int64{100, 100, 100, 99} |
| t.NotPanics(func() { t.unequalNumRows(100, rowsPerCol) }) |
| t.NotPanics(func() { t.unequalNumRowsBuffered(100, rowsPerCol) }) |
| } |
| |
| func (t *SerializeTestSuite) TestTooManyRows() { |
| rowsPerCol := []int64{100, 100, 100, 101} |
| t.NotPanics(func() { t.unequalNumRows(101, rowsPerCol) }) |
| t.NotPanics(func() { t.unequalNumRowsBuffered(101, rowsPerCol) }) |
| } |
| |
| func (t *SerializeTestSuite) TestSmallFile() { |
| codecs := []compress.Compression{ |
| compress.Codecs.Uncompressed, |
| compress.Codecs.Snappy, |
| compress.Codecs.Brotli, |
| compress.Codecs.Gzip, |
| compress.Codecs.Zstd, |
| // compress.Codecs.Lz4, |
| // compress.Codecs.Lzo, |
| } |
| for _, c := range codecs { |
| t.Run(c.String(), func() { |
| t.NotPanics(func() { t.fileSerializeTest(c, c) }) |
| }) |
| } |
| } |
| |
| func TestBufferedDisabledDictionary(t *testing.T) { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| fields := schema.FieldList{schema.NewInt32Node("col", parquet.Repetitions.Required, 1)} |
| sc, _ := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, 0) |
| props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)) |
| |
| writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props)) |
| rgw := writer.AppendBufferedRowGroup() |
| cwr, _ := rgw.Column(0) |
| cw := cwr.(*file.Int32ColumnChunkWriter) |
| cw.WriteBatch([]int32{1}, nil, nil) |
| rgw.Close() |
| writer.Close() |
| |
| buffer := sink.Finish() |
| defer buffer.Release() |
| reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes())) |
| assert.NoError(t, err) |
| assert.EqualValues(t, 1, reader.NumRowGroups()) |
| rgReader := reader.RowGroup(0) |
| assert.EqualValues(t, 1, rgReader.NumRows()) |
| chunk, _ := rgReader.MetaData().ColumnChunk(0) |
| assert.False(t, chunk.HasDictionaryPage()) |
| } |
| |
| func TestBufferedMultiPageDisabledDictionary(t *testing.T) { |
| const ( |
| valueCount = 10000 |
| pageSize = 16384 |
| ) |
| var ( |
| sink = encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| props = parquet.NewWriterProperties(parquet.WithDictionaryDefault(false), parquet.WithDataPageSize(pageSize)) |
| sc, _ = schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ |
| schema.NewInt32Node("col", parquet.Repetitions.Required, -1), |
| }, -1) |
| ) |
| |
| writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props)) |
| rgWriter := writer.AppendBufferedRowGroup() |
| cwr, _ := rgWriter.Column(0) |
| cw := cwr.(*file.Int32ColumnChunkWriter) |
| valuesIn := make([]int32, 0, valueCount) |
| for i := int32(0); i < valueCount; i++ { |
| valuesIn = append(valuesIn, (i%100)+1) |
| } |
| cw.WriteBatch(valuesIn, nil, nil) |
| rgWriter.Close() |
| writer.Close() |
| buffer := sink.Finish() |
| defer buffer.Release() |
| |
| reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes())) |
| assert.NoError(t, err) |
| |
| assert.EqualValues(t, 1, reader.NumRowGroups()) |
| valuesOut := make([]int32, valueCount) |
| |
| for r := 0; r < reader.NumRowGroups(); r++ { |
| rgr := reader.RowGroup(r) |
| assert.EqualValues(t, 1, rgr.NumColumns()) |
| assert.EqualValues(t, valueCount, rgr.NumRows()) |
| |
| var totalRead int64 |
| col, err := rgr.Column(0) |
| assert.NoError(t, err) |
| colReader := col.(*file.Int32ColumnChunkReader) |
| for colReader.HasNext() { |
| total, _, _ := colReader.ReadBatch(valueCount-totalRead, valuesOut[totalRead:], nil, nil) |
| totalRead += total |
| } |
| assert.EqualValues(t, valueCount, totalRead) |
| assert.Equal(t, valuesIn, valuesOut) |
| } |
| } |
| |
| func TestAllNulls(t *testing.T) { |
| sc, _ := schema.NewGroupNode("root", parquet.Repetitions.Required, schema.FieldList{ |
| schema.NewInt32Node("nulls", parquet.Repetitions.Optional, -1), |
| }, -1) |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| writer := file.NewParquetWriter(sink, sc) |
| rgw := writer.AppendRowGroup() |
| cwr, _ := rgw.NextColumn() |
| cw := cwr.(*file.Int32ColumnChunkWriter) |
| |
| var ( |
| values [3]int32 |
| defLevels = [...]int16{0, 0, 0} |
| ) |
| |
| cw.WriteBatch(values[:], defLevels[:], nil) |
| cw.Close() |
| rgw.Close() |
| writer.Close() |
| |
| buffer := sink.Finish() |
| defer buffer.Release() |
| props := parquet.NewReaderProperties(memory.DefaultAllocator) |
| props.BufferedStreamEnabled = true |
| |
| reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(props)) |
| assert.NoError(t, err) |
| |
| rgr := reader.RowGroup(0) |
| col, err := rgr.Column(0) |
| assert.NoError(t, err) |
| cr := col.(*file.Int32ColumnChunkReader) |
| |
| defLevels[0] = -1 |
| defLevels[1] = -1 |
| defLevels[2] = -1 |
| valRead, read, _ := cr.ReadBatch(3, values[:], defLevels[:], nil) |
| assert.EqualValues(t, 3, valRead) |
| assert.EqualValues(t, 0, read) |
| assert.Equal(t, []int16{0, 0, 0}, defLevels[:]) |
| } |
| |
| func TestKeyValueMetadata(t *testing.T) { |
| fields := schema.FieldList{ |
| schema.NewInt32Node("unused", parquet.Repetitions.Optional, -1), |
| } |
| sc, _ := schema.NewGroupNode("root", parquet.Repetitions.Required, fields, -1) |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| writer := file.NewParquetWriter(sink, sc) |
| |
| testKey := "testKey" |
| testValue := "testValue" |
| writer.AppendKeyValueMetadata(testKey, testValue) |
| writer.Close() |
| |
| buffer := sink.Finish() |
| defer buffer.Release() |
| props := parquet.NewReaderProperties(memory.DefaultAllocator) |
| props.BufferedStreamEnabled = true |
| |
| reader, err := file.NewParquetReader(bytes.NewReader(buffer.Bytes()), file.WithReadProps(props)) |
| assert.NoError(t, err) |
| |
| metadata := reader.MetaData() |
| got := metadata.KeyValueMetadata().FindValue(testKey) |
| require.NotNil(t, got) |
| assert.Equal(t, testValue, *got) |
| } |
| |
| func createSerializeTestSuite(typ reflect.Type) suite.TestingSuite { |
| return &SerializeTestSuite{PrimitiveTypedTest: testutils.NewPrimitiveTypedTest(typ)} |
| } |
| |
| func TestSerialize(t *testing.T) { |
| t.Parallel() |
| types := []struct { |
| typ reflect.Type |
| }{ |
| {reflect.TypeOf(true)}, |
| {reflect.TypeOf(int32(0))}, |
| {reflect.TypeOf(int64(0))}, |
| {reflect.TypeOf(float32(0))}, |
| {reflect.TypeOf(float64(0))}, |
| {reflect.TypeOf(parquet.Int96{})}, |
| {reflect.TypeOf(parquet.ByteArray{})}, |
| } |
| for _, tt := range types { |
| tt := tt |
| t.Run(tt.typ.String(), func(t *testing.T) { |
| t.Parallel() |
| suite.Run(t, createSerializeTestSuite(tt.typ)) |
| }) |
| } |
| } |
| |
| type errCloseWriter struct { |
| sink *encoding.BufferWriter |
| } |
| |
| func (c *errCloseWriter) Write(p []byte) (n int, err error) { |
| return c.sink.Write(p) |
| } |
| func (c *errCloseWriter) Close() error { |
| return fmt.Errorf("error during close") |
| } |
| func (c *errCloseWriter) Bytes() []byte { |
| return c.sink.Bytes() |
| } |
| |
| func TestCloseError(t *testing.T) { |
| fields := schema.FieldList{schema.NewInt32Node("col", parquet.Repetitions.Required, 1)} |
| sc, _ := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, 0) |
| sink := &errCloseWriter{sink: encoding.NewBufferWriter(0, memory.DefaultAllocator)} |
| writer := file.NewParquetWriter(sink, sc) |
| assert.Error(t, writer.Close()) |
| } |