blob: 28b1f21fc3b1afac7e438855a1c449e93776de88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.blockcache;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
// commented out on: 24-Dec-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public class BlockDirectoryTest extends SolrTestCaseJ4 {
private static class MapperCache implements Cache {
public Map<String, byte[]> map = Caffeine.newBuilder()
.maximumSize(8)
.<String, byte[]>build()
.asMap();
@Override
public void update(String name, long blockId, int blockOffset, byte[] buffer, int offset, int length) {
byte[] cached = map.get(name + blockId);
if (cached != null) {
int newlen = Math.max(cached.length, blockOffset + length);
byte[] b = new byte[newlen];
System.arraycopy(cached, 0, b, 0, cached.length);
System.arraycopy(buffer, offset, b, blockOffset, length);
cached = b;
} else {
cached = copy(blockOffset, buffer, offset, length);
}
map.put(name + blockId, cached);
}
private byte[] copy(int blockOffset, byte[] buffer, int offset, int length) {
byte[] b = new byte[length + blockOffset];
System.arraycopy(buffer, offset, b, blockOffset, length);
return b;
}
@Override
public boolean fetch(String name, long blockId, int blockOffset, byte[] b, int off, int lengthToReadInBlock) {
// return false;
byte[] data = map.get(name + blockId);
if (data == null) {
return false;
}
System.arraycopy(data, blockOffset, b, off, lengthToReadInBlock);
return true;
}
@Override
public void delete(String name) {
}
@Override
public long size() {
return map.size();
}
@Override
public void renameCacheFile(String source, String dest) {
}
@Override
public void releaseResources() {}
}
private static final int MAX_NUMBER_OF_WRITES = 10000;
private static final int MIN_FILE_SIZE = 100;
private static final int MAX_FILE_SIZE = 100000;
private static final int MIN_BUFFER_SIZE = 1;
private static final int MAX_BUFFER_SIZE = 12000;
private static final int MAX_NUMBER_OF_READS = 20000;
private BlockDirectory directory;
private File file;
private Random random;
private MapperCache mapperCache;
@Before
public void setUp() throws Exception {
super.setUp();
file = createTempDir().toFile();
FSDirectory dir = FSDirectory.open(new File(file, "base").toPath());
mapperCache = new MapperCache();
if (random().nextBoolean()) {
Metrics metrics = new Metrics();
int blockSize = 8192;
int slabSize = blockSize * 16384;
long totalMemory = 1 * slabSize;
BlockCache blockCache = new BlockCache(metrics, true, totalMemory, slabSize, blockSize);
BlockDirectoryCache cache = new BlockDirectoryCache(blockCache, "/collection1", metrics, true);
directory = new BlockDirectory("test", dir, cache, null, true, false);
} else {
directory = new BlockDirectory("test", dir, mapperCache, null, true, true);
}
random = random();
}
@After
public void tearDown() throws Exception {
super.tearDown();
if (null != directory) {
directory.close();
directory = null;
}
}
@Test
public void testEOF() throws IOException {
Directory fsDir = FSDirectory.open(new File(file, "normal").toPath());
String name = "test.eof";
createFile(name, fsDir, directory);
long fsLength = fsDir.fileLength(name);
long hdfsLength = directory.fileLength(name);
assertEquals(fsLength, hdfsLength);
testEof(name, fsDir, fsLength);
testEof(name, directory, hdfsLength);
fsDir.close();
}
private void testEof(String name, Directory directory, long length) throws IOException {
IndexInput input = directory.openInput(name, new IOContext());
try {
input.seek(length);
try {
input.readByte();
fail("should throw eof");
} catch (IOException e) {
}
} finally {
input.close();
}
}
@Test
public void testRandomAccessWrites() throws IOException {
long t1 = System.nanoTime();
int i = 0;
try {
for (; i < 10; i++) {
Directory fsDir = FSDirectory.open(new File(file, "normal").toPath());
String name = getName();
createFile(name, fsDir, directory);
assertInputsEquals(name, fsDir, directory);
}
} catch (Exception e) {
e.printStackTrace();
fail("Test failed on pass [" + i + "]");
}
long t2 = System.nanoTime();
System.out.println("Total time is " + ((t2 - t1)/1000000) + "ms");
}
@Test
public void testRandomAccessWritesLargeCache() throws IOException {
mapperCache.map = Caffeine.newBuilder()
.maximumSize(10_000)
.<String, byte[]>build()
.asMap();
testRandomAccessWrites();
}
private void assertInputsEquals(String name, Directory fsDir, Directory hdfs) throws IOException {
int reads = random.nextInt(MAX_NUMBER_OF_READS);
IndexInput fsInput = fsDir.openInput(name, new IOContext());
IndexInput hdfsInput = hdfs.openInput(name, new IOContext());
assertEquals(fsInput.length(), hdfsInput.length());
int fileLength = (int) fsInput.length();
for (int i = 0; i < reads; i++) {
int rnd;
if (fileLength == 0) {
rnd = 0;
} else {
rnd = random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE, fileLength));
}
byte[] fsBuf = new byte[rnd + MIN_BUFFER_SIZE];
byte[] hdfsBuf = new byte[fsBuf.length];
int offset = random.nextInt(fsBuf.length);
int length = random.nextInt(fsBuf.length - offset);
int pos;
if (fileLength == 0) {
pos = 0;
} else {
pos = random.nextInt(fileLength - length);
}
fsInput.seek(pos);
fsInput.readBytes(fsBuf, offset, length);
hdfsInput.seek(pos);
hdfsInput.readBytes(hdfsBuf, offset, length);
for (int f = offset; f < length; f++) {
if (fsBuf[f] != hdfsBuf[f]) {
fail("read [" + i + "]");
}
}
}
fsInput.close();
hdfsInput.close();
}
private void createFile(String name, Directory fsDir, Directory hdfs) throws IOException {
int writes = random.nextInt(MAX_NUMBER_OF_WRITES);
int fileLength = random.nextInt(MAX_FILE_SIZE - MIN_FILE_SIZE) + MIN_FILE_SIZE;
IndexOutput fsOutput = fsDir.createOutput(name, IOContext.DEFAULT);
IndexOutput hdfsOutput = hdfs.createOutput(name, IOContext.DEFAULT);
for (int i = 0; i < writes; i++) {
byte[] buf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE, fileLength)) + MIN_BUFFER_SIZE];
random.nextBytes(buf);
int offset = random.nextInt(buf.length);
int length = random.nextInt(buf.length - offset);
fsOutput.writeBytes(buf, offset, length);
hdfsOutput.writeBytes(buf, offset, length);
}
fsOutput.close();
hdfsOutput.close();
}
private String getName() {
return Long.toString(Math.abs(random.nextLong()));
}
public static void rm(File file) {
try {
IOUtils.rm(file.toPath());
} catch (Throwable ignored) {
// TODO: should this class care if a file couldnt be deleted?
// this just emulates previous behavior, where only SecurityException would be handled.
}
}
/**
* Verify the configuration options for the block cache are handled
* appropriately.
*/
@Test
public void ensureCacheConfigurable() throws Exception {
IOContext mergeContext = new IOContext(new MergeInfo(1,1,false,1));
BlockDirectory d = directory;
assertTrue(d.useReadCache("", IOContext.DEFAULT));
if (d.getCache() instanceof MapperCache) {
assertTrue(d.useWriteCache("", IOContext.DEFAULT));
} else {
assertFalse(d.useWriteCache("", IOContext.DEFAULT));
}
assertFalse(d.useWriteCache("", mergeContext));
d = new BlockDirectory("test", directory, mapperCache, null, true, false);
assertTrue(d.useReadCache("", IOContext.DEFAULT));
assertFalse(d.useWriteCache("", IOContext.DEFAULT));
assertFalse(d.useWriteCache("", mergeContext));
d = new BlockDirectory("test", directory, mapperCache, null, false, true);
assertFalse(d.useReadCache("", IOContext.DEFAULT));
if (d.getCache() instanceof MapperCache) {
assertTrue(d.useWriteCache("", IOContext.DEFAULT));
} else {
assertFalse(d.useWriteCache("", IOContext.DEFAULT));
}
assertFalse(d.useWriteCache("", mergeContext));
}
}