blob: d89c212ec92a2ceae4c887946d55279cc48cadf5 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.persistence.soplog;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.EnumMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.internal.cache.persistence.soplog.AppendLog.AppendLogWriter;
import com.gemstone.gemfire.internal.logging.LogService;
public class RecoverableSortedOplogSet extends AbstractSortedReader implements SortedOplogSet {
private static final Logger logger = LogService.getLogger();
private final SortedOplogSet sos;
private final long bufferSize;
private final long maxBufferMemory;
private final Lock rollLock;
private AtomicReference<AppendLogWriter> writer;
private final String logPrefix;
public RecoverableSortedOplogSet(SortedOplogSet sos, long bufferSize, double memLimit) throws IOException {
this.sos = sos;
this.bufferSize = bufferSize;
this.logPrefix = "<" + sos.getFactory().getConfiguration().getName() + "> ";
rollLock = new ReentrantLock();
writer = new AtomicReference<AppendLogWriter>(AppendLog.create(nextLogFile()));
maxBufferMemory = Math.round(memLimit * ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax());
}
@Override
public boolean mightContain(byte[] key) throws IOException {
return sos.mightContain(key);
}
@Override
public ByteBuffer read(byte[] key) throws IOException {
return sos.read(key);
}
@Override
public SortedIterator<ByteBuffer> scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
return sos.scan(from, fromInclusive, to, toInclusive);
}
@Override
public SortedIterator<ByteBuffer> scan(
byte[] from,
boolean fromInclusive,
byte[] to,
boolean toInclusive,
boolean ascending,
MetadataFilter filter) throws IOException {
return sos.scan(from, fromInclusive, to, toInclusive, ascending, filter);
}
@Override
public SerializedComparator getComparator() {
return sos.getComparator();
}
@Override
public SortedOplogFactory getFactory() {
return sos.getFactory();
}
@Override
public SortedStatistics getStatistics() throws IOException {
return sos.getStatistics();
}
@Override
public void close() throws IOException {
rollLock.lock();
try {
writer.get().close();
writer.set(null);
sos.close();
} finally {
rollLock.unlock();
}
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
throttle();
if (sos.bufferSize() > bufferSize) {
roll(false);
}
writer.get().append(key, value);
sos.put(key, value);
}
@Override
public long bufferSize() {
return sos.bufferSize();
}
@Override
public long unflushedSize() {
return sos.unflushedSize();
}
@Override
public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException {
roll(true);
}
@Override
public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException {
throw new RuntimeException("Not implemented");
}
@Override
public Compactor getCompactor() {
return sos.getCompactor();
}
@Override
public void clear() throws IOException {
rollLock.lock();
try {
roll(true);
sos.clear();
} finally {
rollLock.unlock();
}
}
@Override
public void destroy() throws IOException {
roll(true);
sos.destroy();
}
@Override
public boolean isClosed() {
return sos.isClosed();
}
private void throttle() {
int n = 0;
while (sos.bufferSize() + sos.unflushedSize() > maxBufferMemory) {
try {
Thread.sleep(1 << n++);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void roll(boolean wait) throws IOException {
boolean locked = true;
if (wait) {
rollLock.lock();
} else {
locked = rollLock.tryLock();
}
if (locked) {
try {
AppendLogWriter next = AppendLog.create(nextLogFile());
final AppendLogWriter old = writer.getAndSet(next);
old.close();
if (logger.isDebugEnabled()) {
logger.debug("{}Rolling from {} to {}", this.logPrefix, old.getFile(), next.getFile());
}
sos.flush(null, new FlushHandler() {
@Override
public void complete() {
old.getFile().delete();
}
@Override
public void error(Throwable t) {
}
});
} finally {
rollLock.unlock();
}
}
}
private File nextLogFile() {
return new File(sos.getFactory().getConfiguration().getName()
+ "-" + UUID.randomUUID().toString() + ".aolog");
}
}