blob: 93052f2b65869599c4fa63377743f6e30b437d86 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.index;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.*;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestBSTIndex {
private TajoConf conf;
private Schema schema;
private TableMeta meta;
private static final int TUPLE_NUM = 10000;
private static final int LOAD_NUM = 100;
private static final String TEST_PATH = "target/test-data/TestIndex";
private Path testDir;
private FileSystem fs;
private String dataFormat;
public TestBSTIndex(String type) {
this.dataFormat = type;
conf = new TajoConf();
conf.setVar(TajoConf.ConfVars.ROOT_DIR, TEST_PATH);
schema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.add(new Column("float", Type.FLOAT4))
.add(new Column("string", Type.TEXT))
.build();
}
@Parameters(name = "{index}: {0}")
public static Collection<Object[]> generateParameters() {
return Arrays.asList(new Object[][]{
{"RAW"},
{"TEXT"}
});
}
@Before
public void setUp() throws Exception {
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
}
@Test
public void testFindValue() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindValue_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX,
keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
}
reader.close();
scanner.close();
}
@Test
public void testBuildIndexWithAppender() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + dataFormat);
FileAppender appender = (FileAppender) ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
Tuple tuple;
long offset;
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
offset = appender.getOffset();
appender.addTuple(tuple);
creater.write(tuple, offset);
}
appender.flush();
appender.close();
creater.flush();
creater.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i) + " ]", (i) == (tuple.getInt8(1)));
assertTrue("[seek check " + (i) + " ]", (i) == (tuple.getFloat8(2)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
}
reader.close();
scanner.close();
}
@Test
public void testFindOmittedValue() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i += 2) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, status.getLen());
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
keyTuple.put(0, DatumFactory.createInt8(i));
keyTuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(keyTuple);
assertEquals(-1, offsets);
}
reader.close();
}
@Test
public void testFindNextKeyValue() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindNextKeyValue_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(0));
keyTuple.put(1, tuple.asDatum(1));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;
for (int i = 0; i < TUPLE_NUM - 1; i++) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
long offsets = reader.find(keyTuple, true);
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]",
(i + 1) == (result.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.getInt8(1)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.getInt8(0)));
assertTrue("[seek check " + (i + 2) + " ]", (i + 2) == (result.getFloat8(1)));
}
reader.close();
scanner.close();
}
@Test
public void testFindNextKeyOmittedValue() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat);
Appender appender = (((FileTablespace) TablespaceManager.getLocalFs()))
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i += 2) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(0));
keyTuple.put(1, tuple.asDatum(1));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;
for (int i = 1; i < TUPLE_NUM - 1; i += 2) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
long offsets = reader.find(keyTuple, true);
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (result.getInt8(1)));
}
scanner.close();
}
@Test
public void testFindMinValue() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindMinValue" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 5; i < TUPLE_NUM + 5; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
tuple.put(0, DatumFactory.createInt8(0));
tuple.put(1, DatumFactory.createFloat8(0));
offset = reader.find(tuple);
assertEquals(-1, offset);
offset = reader.find(tuple, true);
assertTrue(offset >= 0);
scanner.seek(offset);
tuple = scanner.next();
assertEquals(5, tuple.getInt4(1));
assertEquals(5l, tuple.getInt8(2));
reader.close();
scanner.close();
}
@Test
public void testMinMax() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testMinMax_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 5; i < TUPLE_NUM; i += 2) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(0));
keyTuple.put(1, tuple.asDatum(1));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
Tuple min = reader.getFirstKey();
assertEquals(5, min.getInt4(0));
assertEquals(5l, min.getInt8(0));
Tuple max = reader.getLastKey();
assertEquals(TUPLE_NUM - 1, max.getInt4(0));
assertEquals(TUPLE_NUM - 1, max.getInt8(0));
reader.close();
}
private class ConcurrentAccessor implements Runnable {
final BSTIndexReader reader;
final Random rnd = new Random(System.currentTimeMillis());
boolean failed = false;
ConcurrentAccessor(BSTIndexReader reader) {
this.reader = reader;
}
public boolean isFailed() {
return this.failed;
}
@Override
public void run() {
Tuple findKey = new VTuple(2);
int keyVal;
for (int i = 0; i < 10000; i++) {
keyVal = rnd.nextInt(10000);
findKey.put(0, DatumFactory.createInt4(keyVal));
findKey.put(1, DatumFactory.createInt8(keyVal));
try {
assertTrue(reader.find(findKey) != -1);
} catch (Exception e) {
e.printStackTrace();
this.failed = true;
}
}
}
}
@Test
public void testConcurrentAccess() throws IOException, InterruptedException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testConcurrentAccess_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("long"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(0));
keyTuple.put(1, tuple.asDatum(1));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
Thread[] threads = new Thread[5];
ConcurrentAccessor[] accs = new ConcurrentAccessor[5];
for (int i = 0; i < threads.length; i++) {
accs[i] = new ConcurrentAccessor(reader);
threads[i] = new Thread(accs[i]);
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
assertFalse(accs[i].isFailed());
}
reader.close();
}
@Test
public void testFindValueDescOrder() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindValueDescOrder_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), false, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), false, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = (TUPLE_NUM - 1); i > 0; i--) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.getInt4(0)));
assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (tuple.getInt8(1)));
}
reader.close();
scanner.close();
}
@Test
public void testFindNextKeyValueDescOrder() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
for (int i = (TUPLE_NUM - 1); i >= 0; i--) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), false, false);
sortKeys[1] = new SortSpec(schema.getColumn("long"), false, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("int", Type.INT4))
.add(new Column("long", Type.INT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir,
"testFindNextKeyValueDescOrder_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(0));
keyTuple.put(1, tuple.asDatum(1));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"),
keySchema, comp);
reader.init();
assertEquals(keySchema, reader.getKeySchema());
assertEquals(comp, reader.getComparator());
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple result;
for (int i = (TUPLE_NUM - 1); i > 0; i--) {
keyTuple = new VTuple(2);
keyTuple.put(0, DatumFactory.createInt4(i));
keyTuple.put(1, DatumFactory.createInt8(i));
long offsets = reader.find(keyTuple, true);
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i - 1) + " ]",
(i - 1) == (result.getInt4(0)));
assertTrue("[seek check " + (i - 1) + " ]", (i - 1) == (result.getInt8(1)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
result = scanner.next();
assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.getInt8(0)));
assertTrue("[seek check " + (i - 2) + " ]", (i - 2) == (result.getFloat8(1)));
}
reader.close();
scanner.close();
}
@Test
public void testFindValueASCOrder() throws IOException {
meta = CatalogUtil.newTableMeta(dataFormat);
Path tablePath = new Path(testDir, "testFindValue_" + dataFormat);
Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
.getAppender(meta, schema, tablePath);
appender.init();
Tuple tuple;
// order by asc
for (int i = 0; i < TUPLE_NUM; i++) {
tuple = new VTuple(5);
tuple.put(0, DatumFactory.createInt4(i));
tuple.put(1, DatumFactory.createInt8(i));
tuple.put(2, DatumFactory.createFloat8(i));
tuple.put(3, DatumFactory.createFloat4(i));
tuple.put(4, DatumFactory.createText("field_" + i));
appender.addTuple(tuple);
}
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false);
Schema keySchema = SchemaBuilder.builder()
.add(new Column("long", Type.INT8))
.add(new Column("double", Type.FLOAT8))
.build();
BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys);
BSTIndex bst = new BSTIndex(conf);
BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"),
BSTIndex.TWO_LEVEL_INDEX,
keySchema, comp, true);
creater.setLoadNum(LOAD_NUM);
creater.init();
SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
Tuple keyTuple;
long offset;
while (true) {
keyTuple = new VTuple(2);
offset = scanner.getNextOffset();
tuple = scanner.next();
if (tuple == null) break;
keyTuple.put(0, tuple.asDatum(1));
keyTuple.put(1, tuple.asDatum(2));
creater.write(keyTuple, offset);
}
creater.flush();
creater.close();
scanner.close();
tuple = new VTuple(keySchema.size());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
reader.init();
scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
getSeekableScanner(meta, schema, tablet.getProto(), schema);
scanner.init();
for (int i = 0; i < TUPLE_NUM - 1; i++) {
tuple.put(0, DatumFactory.createInt8(i));
tuple.put(1, DatumFactory.createFloat8(i));
long offsets = reader.find(tuple);
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1)));
assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2)));
offsets = reader.next();
if (offsets == -1) {
continue;
}
scanner.seek(offsets);
tuple = scanner.next();
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0)));
assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1)));
}
reader.close();
scanner.close();
}
}