blob: 70ced2e36642b7707cf6a60879cd3efe38f7e9d8 [file] [log] [blame]
package org.apache.solr.store.blockcache;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BufferStore {
public static Logger LOG = LoggerFactory.getLogger(BufferStore.class);
private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
public static AtomicLong shardBuffercacheLost = new AtomicLong();
public static AtomicLong shardBuffercacheAllocate1024 = new AtomicLong();
public static AtomicLong shardBuffercacheAllocate8192 = new AtomicLong();
public static AtomicLong shardBuffercacheAllocateOther = new AtomicLong();
public static void init(int _1024Size, int _8192Size, Metrics metrics) {
LOG.info("Initializing the 1024 buffers with [{}] buffers.", _1024Size);
_1024 = setupBuffers(1024, _1024Size);
LOG.info("Initializing the 8192 buffers with [{}] buffers.", _8192Size);
_8192 = setupBuffers(8192, _8192Size);
shardBuffercacheLost = metrics.shardBuffercacheLost;
shardBuffercacheAllocate1024 = metrics.shardBuffercacheAllocate1024;
shardBuffercacheAllocate8192 = metrics.shardBuffercacheAllocate8192;
shardBuffercacheAllocateOther = metrics.shardBuffercacheAllocateOther;
}
private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
for (int i = 0; i < count; i++) {
queue.add(new byte[bufferSize]);
}
return queue;
}
public static byte[] takeBuffer(int bufferSize) {
switch (bufferSize) {
case 1024:
return newBuffer1024(_1024.poll());
case 8192:
return newBuffer8192(_8192.poll());
default:
return newBuffer(bufferSize);
}
}
public static void putBuffer(byte[] buffer) {
if (buffer == null) {
return;
}
int bufferSize = buffer.length;
switch (bufferSize) {
case 1024:
checkReturn(_1024.offer(buffer));
return;
case 8192:
checkReturn(_8192.offer(buffer));
return;
}
}
private static void checkReturn(boolean offer) {
if (!offer) {
shardBuffercacheLost.incrementAndGet();
}
}
private static byte[] newBuffer1024(byte[] buf) {
if (buf != null) {
return buf;
}
shardBuffercacheAllocate1024.incrementAndGet();
return new byte[1024];
}
private static byte[] newBuffer8192(byte[] buf) {
if (buf != null) {
return buf;
}
shardBuffercacheAllocate8192.incrementAndGet();
return new byte[8192];
}
private static byte[] newBuffer(int size) {
shardBuffercacheAllocateOther.incrementAndGet();
return new byte[size];
}
}