blob: 409c9cb42f9660f0760c774bd43a9569552f05c0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.cosn;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
/**
* BufferPool class is used to manage the buffers during program execution.
* It is provided in a thread-safe singleton mode,and
* keeps the program's memory and disk consumption at a stable value.
*/
public final class BufferPool {
private static final Logger LOG =
LoggerFactory.getLogger(BufferPool.class);
private static BufferPool ourInstance = new BufferPool();
/**
* Use this method to get the instance of BufferPool.
*
* @return the instance of BufferPool
*/
public static BufferPool getInstance() {
return ourInstance;
}
private BlockingQueue<ByteBuffer> bufferPool = null;
private long singleBufferSize = 0;
private File diskBufferDir = null;
private AtomicBoolean isInitialize = new AtomicBoolean(false);
private BufferPool() {
}
private File createDir(String dirPath) throws IOException {
File dir = new File(dirPath);
if (!dir.exists()) {
LOG.debug("Buffer dir: [{}] does not exists. create it first.",
dirPath);
if (dir.mkdirs()) {
if (!dir.setWritable(true) || !dir.setReadable(true)
|| !dir.setExecutable(true)) {
LOG.warn("Set the buffer dir: [{}]'s permission [writable,"
+ "readable, executable] failed.", dir.getAbsolutePath());
}
LOG.debug("Buffer dir: [{}] is created successfully.",
dir.getAbsolutePath());
} else {
// Once again, check if it has been created successfully.
// Prevent problems created by multiple processes at the same time.
if (!dir.exists()) {
throw new IOException("buffer dir:" + dir.getAbsolutePath()
+ " is created unsuccessfully");
}
}
} else {
LOG.debug("buffer dir: {} already exists.", dirPath);
}
return dir;
}
/**
* Create buffers correctly by reading the buffer file directory,
* buffer pool size,and file block size in the configuration.
*
* @param conf Provides configurations for the Hadoop runtime
* @throws IOException Configuration errors,
* insufficient or no access for memory or
* disk space may cause this exception
*/
public synchronized void initialize(Configuration conf)
throws IOException {
if (this.isInitialize.get()) {
return;
}
this.singleBufferSize = conf.getLong(CosNConfigKeys.COSN_BLOCK_SIZE_KEY,
CosNConfigKeys.DEFAULT_BLOCK_SIZE);
// The block size of CosN can only support up to 2GB.
if (this.singleBufferSize < Constants.MIN_PART_SIZE
|| this.singleBufferSize > Constants.MAX_PART_SIZE) {
String exceptionMsg = String.format(
"The block size of CosN is limited to %d to %d",
Constants.MIN_PART_SIZE, Constants.MAX_PART_SIZE);
throw new IOException(exceptionMsg);
}
long memoryBufferLimit = conf.getLong(
CosNConfigKeys.COSN_UPLOAD_BUFFER_SIZE_KEY,
CosNConfigKeys.DEFAULT_UPLOAD_BUFFER_SIZE);
this.diskBufferDir = this.createDir(conf.get(
CosNConfigKeys.COSN_BUFFER_DIR_KEY,
CosNConfigKeys.DEFAULT_BUFFER_DIR));
int bufferPoolSize = (int) (memoryBufferLimit / this.singleBufferSize);
if (0 == bufferPoolSize) {
throw new IOException(
String.format("The total size of the buffer [%d] is " +
"smaller than a single block [%d]."
+ "please consider increase the buffer size " +
"or decrease the block size",
memoryBufferLimit, this.singleBufferSize));
}
this.bufferPool = new LinkedBlockingQueue<>(bufferPoolSize);
for (int i = 0; i < bufferPoolSize; i++) {
this.bufferPool.add(ByteBuffer.allocateDirect(
(int) this.singleBufferSize));
}
this.isInitialize.set(true);
}
/**
* Check if the buffer pool has been initialized.
*
* @throws IOException if the buffer pool is not initialized
*/
private void checkInitialize() throws IOException {
if (!this.isInitialize.get()) {
throw new IOException(
"The buffer pool has not been initialized yet");
}
}
/**
* Obtain a buffer from this buffer pool through the method.
*
* @param bufferSize expected buffer size to get
* @return a buffer wrapper that satisfies the bufferSize.
* @throws IOException if the buffer pool not initialized,
* or the bufferSize parameter is not within
* the range[1MB to the single buffer size]
*/
public ByteBufferWrapper getBuffer(int bufferSize) throws IOException {
this.checkInitialize();
if (bufferSize > 0 && bufferSize <= this.singleBufferSize) {
ByteBufferWrapper byteBufferWrapper = this.getByteBuffer();
if (null == byteBufferWrapper) {
// Use a disk buffer when the memory buffer is not enough
byteBufferWrapper = this.getMappedBuffer();
}
return byteBufferWrapper;
} else {
String exceptionMsg = String.format(
"Parameter buffer size out of range: 1048576 to %d",
this.singleBufferSize
);
throw new IOException(exceptionMsg);
}
}
/**
* Get a ByteBufferWrapper from the buffer pool.
*
* @return a new byte buffer wrapper
* @throws IOException if the buffer pool is not initialized
*/
private ByteBufferWrapper getByteBuffer() throws IOException {
this.checkInitialize();
ByteBuffer buffer = this.bufferPool.poll();
return buffer == null ? null : new ByteBufferWrapper(buffer);
}
/**
* Get a mapped buffer from the buffer pool.
*
* @return a new mapped buffer
* @throws IOException If the buffer pool is not initialized.
* or some I/O error occurs
*/
private ByteBufferWrapper getMappedBuffer() throws IOException {
this.checkInitialize();
File tmpFile = File.createTempFile(Constants.BLOCK_TMP_FILE_PREFIX,
Constants.BLOCK_TMP_FILE_SUFFIX, this.diskBufferDir);
tmpFile.deleteOnExit();
RandomAccessFile raf = new RandomAccessFile(tmpFile, "rw");
raf.setLength(this.singleBufferSize);
MappedByteBuffer buf = raf.getChannel().map(
FileChannel.MapMode.READ_WRITE, 0, this.singleBufferSize);
return new ByteBufferWrapper(buf, raf, tmpFile);
}
/**
* return the byte buffer wrapper to the buffer pool.
*
* @param byteBufferWrapper the byte buffer wrapper getting from the pool
* @throws InterruptedException if interrupted while waiting
* @throws IOException some io error occurs
*/
public void returnBuffer(ByteBufferWrapper byteBufferWrapper)
throws InterruptedException, IOException {
if (null == this.bufferPool || null == byteBufferWrapper) {
return;
}
if (byteBufferWrapper.isDiskBuffer()) {
byteBufferWrapper.close();
} else {
ByteBuffer byteBuffer = byteBufferWrapper.getByteBuffer();
if (null != byteBuffer) {
byteBuffer.clear();
LOG.debug("Return the buffer to the buffer pool.");
if (!this.bufferPool.offer(byteBuffer)) {
LOG.error("Return the buffer to buffer pool failed.");
}
}
}
}
}