blob: 091c899efe5dd4ee2a6e05f3b1ab10ae50d5c040 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
* efficently copy data to files.
*
*
*/
class NIODataFileAppender extends DataFileAppender {
public NIODataFileAppender(AsyncDataManager fileManager) {
super(fileManager);
}
/**
* The async processing loop that writes to the data files and does the
* force calls.
*
* Since the file sync() call is the slowest of all the operations, this
* algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching
* the same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
FileChannel channel = null;
WriteBatch wb = null;
try {
ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
// Populate the static parts of the headers and footers..
header.putInt(0); // size
header.put((byte)0); // type
header.put(RESERVED_SPACE); // reserved
header.put(AsyncDataManager.ITEM_HEAD_SOR);
footer.put(AsyncDataManager.ITEM_HEAD_EOR);
while (true) {
Object o = null;
// Block till we get a command.
synchronized (enqueueMutex) {
while (true) {
if (nextWriteBatch != null) {
o = nextWriteBatch;
nextWriteBatch = null;
break;
}
if (shutdown) {
return;
}
enqueueMutex.wait();
}
enqueueMutex.notify();
}
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile(true);
channel = file.getChannel();
}
WriteCommand write = wb.first;
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
boolean forceToDisk=false;
//
// is it just 1 big write?
if (wb.size == write.location.getSize()) {
forceToDisk = write.sync | write.onComplete!=null;
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
transfer(header, channel);
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
write.data.getLength());
transfer(source, channel);
footer.clear();
transfer(footer, channel);
} else {
// Combine the smaller writes into 1 big buffer
while (write != null) {
forceToDisk |= write.sync | write.onComplete!=null;
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
copy(header, buffer);
assert !header.hasRemaining();
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
write.data.getLength());
copy(source, buffer);
assert !source.hasRemaining();
footer.clear();
copy(footer, buffer);
assert !footer.hasRemaining();
write = (WriteCommand)write.getNext();
}
// Fully write out the buffer..
buffer.flip();
transfer(buffer, channel);
buffer.clear();
}
if( forceToDisk ) {
file.getChannel().force(false);
}
WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
dataManager.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
write = wb.first;
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = (WriteCommand)write.getNext();
}
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
}
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
if (wb != null) {
wb.latch.countDown();
wb.exception.set(e);
}
if (nextWriteBatch != null) {
nextWriteBatch.latch.countDown();
nextWriteBatch.exception.set(e);
}
}
} catch (InterruptedException e) {
} finally {
try {
if (file != null) {
dataFile.closeRandomAccessFile(file);
dataFile = null;
file.close();
file = null;
}
if (channel != null) {
channel.close();
channel = null;
}
} catch (IOException e) {
}
shutdownDone.countDown();
running = false;
}
}
/**
* Copy the bytes in header to the channel.
*
* @param header - source of data
* @param channel - destination where the data will be written.
* @throws IOException
*/
private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
while (header.hasRemaining()) {
channel.write(header);
}
}
private int copy(ByteBuffer src, ByteBuffer dest) {
int rc = Math.min(dest.remaining(), src.remaining());
if (rc > 0) {
// Adjust our limit so that we don't overflow the dest buffer.
int limit = src.limit();
src.limit(src.position() + rc);
dest.put(src);
// restore the limit.
src.limit(limit);
}
return rc;
}
}