blob: d5828a1aa03e3aef29c236281dc89854e19fd49b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.opt;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
/**
* A memory-resident {@link IndexOutput} implementation.
*/
public class GridLuceneOutputStream extends IndexOutput implements Accountable {
/** Off-heap page size. */
static final int BUFFER_SIZE = 32 * 1024;
/** */
private GridLuceneFile file;
/** */
private long currBuf;
/** */
private int currBufIdx;
/** */
private int bufPosition;
/** */
private long bufStart;
/** */
private int bufLength;
/** */
private final GridUnsafeMemory mem;
/** */
private final Checksum crc;
/**
* Constructor.
*
* @param f File.
*/
public GridLuceneOutputStream(GridLuceneFile f) {
super("RAMOutputStream(name=\"" + f.getName() + "\")", f.getName());
file = f;
mem = f.getDirectory().memory();
// make sure that we switch to the
// first needed buffer lazily
currBufIdx = -1;
currBuf = 0;
crc = new BufferedChecksum(new CRC32());
}
/**
* Resets this to an empty file.
*/
public void reset() {
currBuf = 0;
currBufIdx = -1;
bufPosition = 0;
bufStart = 0;
bufLength = 0;
file.setLength(0);
crc.reset();
}
/** {@inheritDoc} */
@Override public void close() throws IOException {
flush();
file.releaseRef();
}
/** {@inheritDoc} */
@Override public long getChecksum() throws IOException {
return crc.getValue();
}
/** {@inheritDoc} */
@Override public void writeByte(byte b) throws IOException {
if (bufPosition == bufLength) {
currBufIdx++;
switchCurrentBuffer();
}
crc.update(b);
mem.writeByte(currBuf + bufPosition++, b);
}
/** {@inheritDoc} */
@Override public void writeBytes(byte[] b, int offset, int len) throws IOException {
assert b != null;
crc.update(b, offset, len);
while (len > 0) {
if (bufPosition == bufLength) {
currBufIdx++;
switchCurrentBuffer();
}
int remainInBuf = BUFFER_SIZE - bufPosition;
int bytesToCp = len < remainInBuf ? len : remainInBuf;
mem.writeBytes(currBuf + bufPosition, b, offset, bytesToCp);
offset += bytesToCp;
len -= bytesToCp;
bufPosition += bytesToCp;
}
}
/**
* Switch buffer to next.
*/
private void switchCurrentBuffer() {
currBuf = currBufIdx == file.numBuffers() ? file.addBuffer() : file.getBuffer(currBufIdx);
bufPosition = 0;
bufStart = (long)BUFFER_SIZE * (long)currBufIdx;
bufLength = BUFFER_SIZE;
}
/**
* Sets file length.
*/
private void setFileLength() {
long pointer = bufStart + bufPosition;
if (pointer > file.getLength())
file.setLength(pointer);
}
/** Forces any buffered output to be written. */
private void flush() throws IOException {
setFileLength();
}
/** {@inheritDoc} */
@Override public long getFilePointer() {
return currBufIdx < 0 ? 0 : bufStart + bufPosition;
}
/** {@inheritDoc} */
@Override public void copyBytes(DataInput input, long numBytes) throws IOException {
assert numBytes >= 0 : "numBytes=" + numBytes;
GridLuceneInputStream gridInput = input instanceof GridLuceneInputStream ? (GridLuceneInputStream)input : null;
while (numBytes > 0) {
if (bufPosition == bufLength) {
currBufIdx++;
switchCurrentBuffer();
}
int toCp = BUFFER_SIZE - bufPosition;
if (numBytes < toCp)
toCp = (int)numBytes;
if (gridInput != null)
gridInput.readBytes(currBuf + bufPosition, toCp);
else {
byte[] buff = new byte[toCp];
input.readBytes(buff, 0, toCp, false);
mem.writeBytes(currBuf + bufPosition, buff);
}
numBytes -= toCp;
bufPosition += toCp;
}
}
/** {@inheritDoc} */
@Override public long ramBytesUsed() {
return file.getSizeInBytes();
}
/** {@inheritDoc} */
@Override public Collection<Accountable> getChildResources() {
return Collections.singleton(Accountables.namedAccountable("file", file));
}
}