blob: 9a7c16a9035daa0ab52cd09ac5ad197dc89af8c0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail.memory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.crail.*;
import org.apache.crail.CrailStatistics.StatisticsProvider;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.utils.CrailUtils;
import org.slf4j.Logger;
public abstract class BufferCache implements CrailStatistics.StatisticsProvider, CrailBufferCache {
private static final Logger LOG = CrailUtils.getLogger();
private LinkedBlockingQueue<CrailBuffer> cache;
private AtomicLong cacheGet;
private AtomicLong cachePut;
private AtomicLong cacheMisses;
private AtomicLong cacheOut;
private AtomicLong cacheMax;
private AtomicLong cacheMissesMap;
private AtomicLong cacheMissesHeap;
public abstract CrailBuffer allocateRegion() throws IOException;
public BufferCache() throws IOException {
this.cache = new LinkedBlockingQueue<CrailBuffer>();
this.cacheGet = new AtomicLong(0);
this.cachePut = new AtomicLong(0);
this.cacheMisses = new AtomicLong(0);
this.cacheOut = new AtomicLong(0);
this.cacheMax = new AtomicLong(0);
this.cacheMissesMap = new AtomicLong(0);
this.cacheMissesHeap = new AtomicLong(0);
}
@Override
public String providerName() {
return "cache/buffer";
}
@Override
public String printStatistics() {
return "cacheGet " + cacheGet.get() + ", cachePut " + cachePut.get() + ", cacheMiss " + cacheMisses.get() + ", cacheSize " + cache.size() + ", cacheMax " + cacheMax.get() + ", mapMiss " + cacheMissesMap.get() + ", mapHeap " + cacheMissesHeap.get();
}
public void resetStatistics(){
this.cacheGet.set(0);
this.cachePut.set(0);
this.cacheMisses.set(0);
this.cacheOut.set(0);
this.cacheMax.set(0);
this.cacheMissesMap.set(0);
this.cacheMissesHeap.set(0);
}
public void mergeStatistics(StatisticsProvider provider){
}
public CrailBuffer allocateBuffer() throws IOException {
cacheGet.incrementAndGet();
cacheOut.incrementAndGet();
cacheMax.updateAndGet(x -> Math.max(x, cacheOut.get()));
CrailBuffer buffer = cache.poll();
if (buffer == null){
synchronized(this){
buffer = cache.poll();
if (buffer == null){
cacheMisses.incrementAndGet();
buffer = allocateRegion();
if (buffer == null){
buffer = OffHeapBuffer.wrap(ByteBuffer.allocateDirect(CrailConstants.BUFFER_SIZE));
cacheMissesHeap.incrementAndGet();
} else {
cacheMissesMap.incrementAndGet();
}
}
}
}
buffer.clear();
return buffer;
}
public void freeBuffer(CrailBuffer buffer) throws IOException{
if (buffer != null){
cachePut.incrementAndGet();
cacheOut.decrementAndGet();
putBufferInternal(buffer);
}
}
public void putBufferInternal(CrailBuffer buffer) throws IOException{
cache.add(buffer);
}
public void close(){
cache.clear();
}
@SuppressWarnings("unchecked")
public static BufferCache createInstance(String name) throws Exception {
Class<?> nodeClass = Class.forName(name);
if (BufferCache.class.isAssignableFrom(nodeClass)){
Class<? extends BufferCache> bufferCacheClass = (Class<? extends BufferCache>) nodeClass;
BufferCache bufferCache = bufferCacheClass.newInstance();
return bufferCache;
} else {
throw new Exception("Cannot instantiate storage client of type " + name);
}
}
}