blob: 18b9eda26c88ab18b61de391d4cd11c15cb92ac7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.store.blockcache_v2;
import java.io.EOFException;
import java.io.IOException;
import org.apache.blur.store.blockcache_v2.cachevalue.ByteArrayCacheValue;
import org.apache.blur.store.buffer.BufferStore;
import org.apache.blur.store.buffer.Store;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
public class CacheIndexInput extends IndexInput {
private final long _fileLength;
private final long _fileId;
private final int _cacheBlockSize;
private final int _bufferSize;
private final CacheDirectory _directory;
private final String _fileName;
private final Cache _cache;
private final Store _store;
private final IndexInputCache _indexInputCache;
private IndexInput _indexInput;
private CacheKey _key = new CacheKey();
private CacheValue _cacheValue;
private CacheValue _cacheValueQuietRefCannotBeReleased;
private long _position;
private int _blockPosition;
private boolean _quiet;
private boolean _isClosed;
public CacheIndexInput(CacheDirectory directory, String fileName, IndexInput indexInput, Cache cache)
throws IOException {
super("CacheIndexInput(" + indexInput.toString() + ")");
_directory = directory;
_fileName = fileName;
_indexInput = indexInput;
_fileLength = indexInput.length();
_cache = cache;
_fileId = _cache.getFileId(_directory, _fileName);
_cacheBlockSize = _cache.getCacheBlockSize(_directory, _fileName);
_indexInputCache = _cache.createIndexInputCache(_directory, _fileName, _fileLength);
_bufferSize = _cache.getFileBufferSize(_directory, _fileName);
_quiet = _cache.shouldBeQuiet(_directory, _fileName);
_key.setFileId(_fileId);
_isClosed = false;
_store = BufferStore.instance(_bufferSize);
}
@Override
public int readVInt() throws IOException {
if (isCacheValueValid() && remaining() >= 5) {
byte b;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
if (b >= 0)
return b;
int i = b & 0x7F;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7F) << 7;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7F) << 14;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7F) << 21;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
// Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
i |= (b & 0x0F) << 28;
if ((b & 0xF0) == 0)
return i;
throw new IOException("Invalid vInt detected (too many bits)");
}
return super.readVInt();
}
private boolean isCacheValueValid() {
if (_cacheValue != null && !_cacheValue.isEvicted()) {
return true;
}
return false;
}
@Override
public long readVLong() throws IOException {
if (isCacheValueValid() && remaining() >= 9) {
byte b;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
if (b >= 0)
return b;
long i = b & 0x7FL;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 7;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 14;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 21;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 28;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 35;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e2) {
b = readByte();
}
i |= (b & 0x7FL) << 42;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e1) {
b = readByte();
}
i |= (b & 0x7FL) << 49;
if (b >= 0)
return i;
try {
b = readByteFromCache();
} catch (EvictionException e) {
b = readByte();
}
i |= (b & 0x7FL) << 56;
if (b >= 0)
return i;
throw new IOException("Invalid vLong detected (negative values disallowed)");
}
return super.readVLong();
}
@Override
public byte readByte() throws IOException {
ensureOpen();
byte b;
tryToFill();
try {
b = _cacheValue.read(_blockPosition);
} catch (EvictionException e) {
releaseCache();
return readByte();
}
_position++;
_blockPosition++;
return b;
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
ensureOpen();
while (len > 0) {
tryToFill();
int remaining = remaining();
int length = Math.min(len, remaining);
try {
_cacheValue.read(_blockPosition, b, offset, length);
} catch (EvictionException e) {
releaseCache();
readBytes(b, offset, len);
return;
}
offset += length;
len -= length;
_position += length;
_blockPosition += length;
}
}
@Override
public short readShort() throws IOException {
ensureOpen();
if (isCacheValueValid() && remaining() >= 2) {
short s;
try {
s = _cacheValue.readShort(_blockPosition);
} catch (EvictionException e) {
return super.readShort();
}
_blockPosition += 2;
_position += 2;
return s;
}
return super.readShort();
}
@Override
public int readInt() throws IOException {
ensureOpen();
if (isCacheValueValid() && remaining() >= 4) {
int i;
try {
i = _cacheValue.readInt(_blockPosition);
} catch (EvictionException e) {
return super.readInt();
}
_blockPosition += 4;
_position += 4;
return i;
}
return super.readInt();
}
@Override
public long readLong() throws IOException {
ensureOpen();
if (isCacheValueValid() && remaining() >= 8) {
long l;
try {
l = _cacheValue.readLong(_blockPosition);
} catch (EvictionException e) {
return super.readLong();
}
_blockPosition += 8;
_position += 8;
return l;
}
return super.readLong();
}
@Override
public void close() throws IOException {
if (!_isClosed) {
_isClosed = true;
_indexInput.close();
releaseCache();
_indexInputCache.close();
}
}
@Override
public long getFilePointer() {
ensureOpen();
return _position;
}
private void checkEOF() throws EOFException {
if (_position >= _fileLength) {
throw new EOFException("read past EOF: " + this.toString());
}
}
@Override
public void seek(long pos) throws IOException {
ensureOpen();
if (pos >= _fileLength) {
_position = pos;
releaseCache();
return;
}
if (_position == pos) {
// Seeking to same position
return;
}
long oldBlockId = getBlockId();
if (_blockPosition == _cacheBlockSize) {
// If we are at the end of the current block, but haven't actually fetched
// the next block then we are really on the previous.
oldBlockId--;
}
_position = pos;
long newBlockId = getBlockId(_position);
if (newBlockId == oldBlockId) {
// need to set new block position
_blockPosition = getBlockPosition();
} else {
releaseCache();
}
}
@Override
public long length() {
ensureOpen();
return _fileLength;
}
@Override
public IndexInput clone() {
ensureOpen();
CacheIndexInput clone = (CacheIndexInput) super.clone();
clone._key = _key.clone();
clone._indexInput = _indexInput.clone();
clone._quiet = _cache.shouldBeQuiet(_directory, _fileName);
clone._cacheValueQuietRefCannotBeReleased = null;
return clone;
}
private byte readByteFromCache() throws EvictionException {
byte b = _cacheValue.read(_blockPosition);
_position++;
_blockPosition++;
return b;
}
private int remaining() {
try {
return _cacheValue.length() - _blockPosition;
} catch (EvictionException e) {
return 0;
}
}
private void tryToFill() throws IOException {
checkEOF();
if (!isCacheValueValid() || remaining() == 0) {
releaseCache();
fill();
} else {
return;
}
}
private void releaseCache() {
if (_cacheValue != null) {
_cacheValue = null;
}
}
private void fillQuietly() throws IOException {
_key.setBlockId(getBlockId());
_cacheValue = lookup(true);
if (_cacheValue == null) {
if (_cacheValueQuietRefCannotBeReleased == null) {
// @TODO this could be improved.
int cacheBlockSize = _cache.getCacheBlockSize(_directory, _fileName);
_cacheValueQuietRefCannotBeReleased = new ByteArrayCacheValue(cacheBlockSize);
}
_cacheValue = _cacheValueQuietRefCannotBeReleased;
long filePosition = getFilePosition();
_indexInput.seek(filePosition);
byte[] buffer = _store.takeBuffer(_bufferSize);
int len = (int) Math.min(_cacheBlockSize, _fileLength - filePosition);
int cachePosition = 0;
while (len > 0) {
int length = Math.min(_bufferSize, len);
_indexInput.readBytes(buffer, 0, length);
_cacheValue.write(cachePosition, buffer, 0, length);
len -= length;
cachePosition += length;
}
_store.putBuffer(buffer);
}
_blockPosition = getBlockPosition();
}
private void fillNormally() throws IOException {
_key.setBlockId(getBlockId());
_cacheValue = lookup(false);
if (_cacheValue == null) {
_cacheValue = _cache.newInstance(_directory, _fileName);
long filePosition = getFilePosition();
_indexInput.seek(filePosition);
byte[] buffer = _store.takeBuffer(_bufferSize);
int len = (int) Math.min(_cacheBlockSize, _fileLength - filePosition);
int cachePosition = 0;
while (len > 0) {
int length = Math.min(_bufferSize, len);
_indexInput.readBytes(buffer, 0, length);
_cacheValue.write(cachePosition, buffer, 0, length);
len -= length;
cachePosition += length;
}
_store.putBuffer(buffer);
_cache.put(_directory, _fileName, _key.clone(), _cacheValue);
}
_blockPosition = getBlockPosition();
}
private CacheValue lookup(boolean quietly) {
CacheValue cacheValue = _indexInputCache.get(_key.getBlockId());
if (cacheValue == null) {
if (quietly) {
cacheValue = _cache.getQuietly(_directory, _fileName, _key);
} else {
cacheValue = _cache.get(_directory, _fileName, _key);
}
}
if (cacheValue != null) {
_indexInputCache.put(_key.getBlockId(), cacheValue);
}
return cacheValue;
}
private void fill() throws IOException {
if (_quiet) {
fillQuietly();
} else {
fillNormally();
}
}
private int getBlockPosition() {
return (int) (_position % _cacheBlockSize);
}
private long getFilePosition() {
// make this a mask...?
return getBlockId() * _cacheBlockSize;
}
private long getBlockId(long pos) {
return pos / _cacheBlockSize;
}
private long getBlockId() {
return _position / _cacheBlockSize;
}
private void ensureOpen() {
if (_isClosed) {
throw new AlreadyClosedException("Already closed: " + this);
}
}
}