blob: 3b750a0a147eb790e3a32fefbdb8cf4b9fe79b75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.mem;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.mem.MemPageStore;
import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.Types;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestMemColumn {
private static final Logger LOG = LoggerFactory.getLogger(TestMemColumn.class);
@Test
public void testMemColumn() throws Exception {
MessageType schema = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
ColumnDescriptor path = schema.getColumnDescription(new String[] {"foo", "bar"});
MemPageStore memPageStore = new MemPageStore(10);
ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
columnWriter.write(42l, 0, 0);
memColumnsStore.endRecord();
memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
assertEquals(columnReader.getLong(), 42);
columnReader.consume();
}
}
private ColumnWriter getColumnWriter(ColumnDescriptor path, MemPageStore memPageStore) {
ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
return columnWriter;
}
private ColumnReader getColumnReader(MemPageStore memPageStore, ColumnDescriptor path, MessageType schema) {
return new ColumnReadStoreImpl(
memPageStore,
new DummyRecordConverter(schema).getRootConverter(),
schema,
null
).getColumnReader(path);
}
@Test
public void testMemColumnBinary() throws Exception {
MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required binary bar; } }");
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
columnWriter.write(Binary.fromString("42"), 0, 0);
memColumnsStore.endRecord();
memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
assertEquals(columnReader.getBinary().toStringUsingUTF8(), "42");
columnReader.consume();
}
}
@Test
public void testMemColumnSeveralPages() throws Exception {
MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
for (int i = 0; i < 2000; i++) {
columnWriter.write(42l, 0, 0);
memColumnsStore.endRecord();
}
memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
assertEquals(columnReader.getLong(), 42);
columnReader.consume();
}
}
@Test
public void testMemColumnSeveralPagesRepeated() throws Exception {
MessageType mt = MessageTypeParser.parseMessageType("message msg { repeated group foo { repeated int64 bar; } }");
String[] col = new String[]{"foo", "bar"};
MemPageStore memPageStore = new MemPageStore(10);
ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
ColumnDescriptor path1 = mt.getColumnDescription(col);
ColumnDescriptor path = path1;
ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
int[] rs = { 0, 0, 0, 1, 1, 1, 2, 2, 2};
int[] ds = { 0, 1, 2, 0, 1, 2, 0, 1, 2};
for (int i = 0; i < 837; i++) {
int r = rs[i % rs.length];
int d = ds[i % ds.length];
LOG.debug("write i: {}", i);
if (i != 0 && r == 0) {
memColumnsStore.endRecord();
}
if (d == 2) {
columnWriter.write((long)i, r, d);
} else {
columnWriter.writeNull(r, d);
}
}
memColumnsStore.endRecord();
memColumnsStore.flush();
ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
int i = 0;
for (int j = 0; j < columnReader.getTotalValueCount(); j++) {
int r = rs[i % rs.length];
int d = ds[i % ds.length];
LOG.debug("read i: {}", i);
assertEquals("r row " + i, r, columnReader.getCurrentRepetitionLevel());
assertEquals("d row " + i, d, columnReader.getCurrentDefinitionLevel());
if (d == 2) {
assertEquals("data row " + i, i, columnReader.getLong());
}
columnReader.consume();
++ i;
}
}
@Test
public void testPageSize() {
MessageType schema = Types.buildMessage()
.requiredList().requiredElement(BINARY).named("binary_col")
.requiredList().requiredElement(INT32).named("int32_col")
.named("msg");
System.out.println(schema);
MemPageStore memPageStore = new MemPageStore(123);
// Using V2 pages so we have rowCount info
ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder()
.withPageSize(1024) // Less than 10 records for binary_col
.withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing
.withPageRowCountLimit(10)
.withDictionaryEncoding(false) // Enforce having large binary_col pages
.build());
ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" });
ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol);
ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" });
ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col);
// Writing 123 records
for (int i = 0; i < 123; ++i) {
// Writing 10 values per record
for (int j = 0; j < 10; ++j) {
binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2);
int32ColWriter.write(42, j == 0 ? 0 : 2, 2);
}
writeStore.endRecord();
}
writeStore.flush();
// Check that all the binary_col pages are <= 1024 bytes
{
PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol);
assertEquals(1230, binaryColPageReader.getTotalValueCount());
int pageCnt = 0;
int valueCnt = 0;
while (valueCnt < binaryColPageReader.getTotalValueCount()) {
DataPage page = binaryColPageReader.readPage();
++pageCnt;
valueCnt += page.getValueCount();
LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024);
}
}
// Check that all the int32_col pages contain <= 10 rows
{
PageReader int32ColPageReader = memPageStore.getPageReader(int32Col);
assertEquals(1230, int32ColPageReader.getTotalValueCount());
int pageCnt = 0;
int valueCnt = 0;
while (valueCnt < int32ColPageReader.getTotalValueCount()) {
DataPage page = int32ColPageReader.readPage();
++pageCnt;
valueCnt += page.getValueCount();
LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10);
}
}
}
private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore,
ParquetProperties.builder()
.withPageSize(2048)
.withDictionaryEncoding(false)
.build());
}
}