| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs.cosn; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.MappedByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * BufferPool class is used to manage the buffers during program execution. |
| * It is provided in a thread-safe singleton mode,and |
| * keeps the program's memory and disk consumption at a stable value. |
| */ |
| public final class BufferPool { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(BufferPool.class); |
| |
| private static BufferPool ourInstance = new BufferPool(); |
| |
| /** |
| * Use this method to get the instance of BufferPool. |
| * |
| * @return the instance of BufferPool |
| */ |
| public static BufferPool getInstance() { |
| return ourInstance; |
| } |
| |
| private BlockingQueue<ByteBuffer> bufferPool = null; |
| private long singleBufferSize = 0; |
| private File diskBufferDir = null; |
| |
| private AtomicBoolean isInitialize = new AtomicBoolean(false); |
| |
| private BufferPool() { |
| } |
| |
| private File createDir(String dirPath) throws IOException { |
| File dir = new File(dirPath); |
| if (!dir.exists()) { |
| LOG.debug("Buffer dir: [{}] does not exists. create it first.", |
| dirPath); |
| if (dir.mkdirs()) { |
| if (!dir.setWritable(true) || !dir.setReadable(true) |
| || !dir.setExecutable(true)) { |
| LOG.warn("Set the buffer dir: [{}]'s permission [writable," |
| + "readable, executable] failed.", dir.getAbsolutePath()); |
| } |
| LOG.debug("Buffer dir: [{}] is created successfully.", |
| dir.getAbsolutePath()); |
| } else { |
| // Once again, check if it has been created successfully. |
| // Prevent problems created by multiple processes at the same time. |
| if (!dir.exists()) { |
| throw new IOException("buffer dir:" + dir.getAbsolutePath() |
| + " is created unsuccessfully"); |
| } |
| } |
| } else { |
| LOG.debug("buffer dir: {} already exists.", dirPath); |
| } |
| |
| return dir; |
| } |
| |
| /** |
| * Create buffers correctly by reading the buffer file directory, |
| * buffer pool size,and file block size in the configuration. |
| * |
| * @param conf Provides configurations for the Hadoop runtime |
| * @throws IOException Configuration errors, |
| * insufficient or no access for memory or |
| * disk space may cause this exception |
| */ |
| public synchronized void initialize(Configuration conf) |
| throws IOException { |
| if (this.isInitialize.get()) { |
| return; |
| } |
| this.singleBufferSize = conf.getLong(CosNConfigKeys.COSN_BLOCK_SIZE_KEY, |
| CosNConfigKeys.DEFAULT_BLOCK_SIZE); |
| |
| // The block size of CosN can only support up to 2GB. |
| if (this.singleBufferSize < Constants.MIN_PART_SIZE |
| || this.singleBufferSize > Constants.MAX_PART_SIZE) { |
| String exceptionMsg = String.format( |
| "The block size of CosN is limited to %d to %d", |
| Constants.MIN_PART_SIZE, Constants.MAX_PART_SIZE); |
| throw new IOException(exceptionMsg); |
| } |
| |
| long memoryBufferLimit = conf.getLong( |
| CosNConfigKeys.COSN_UPLOAD_BUFFER_SIZE_KEY, |
| CosNConfigKeys.DEFAULT_UPLOAD_BUFFER_SIZE); |
| |
| this.diskBufferDir = this.createDir(conf.get( |
| CosNConfigKeys.COSN_BUFFER_DIR_KEY, |
| CosNConfigKeys.DEFAULT_BUFFER_DIR)); |
| |
| int bufferPoolSize = (int) (memoryBufferLimit / this.singleBufferSize); |
| if (0 == bufferPoolSize) { |
| throw new IOException( |
| String.format("The total size of the buffer [%d] is " + |
| "smaller than a single block [%d]." |
| + "please consider increase the buffer size " + |
| "or decrease the block size", |
| memoryBufferLimit, this.singleBufferSize)); |
| } |
| this.bufferPool = new LinkedBlockingQueue<>(bufferPoolSize); |
| for (int i = 0; i < bufferPoolSize; i++) { |
| this.bufferPool.add(ByteBuffer.allocateDirect( |
| (int) this.singleBufferSize)); |
| } |
| |
| this.isInitialize.set(true); |
| } |
| |
| /** |
| * Check if the buffer pool has been initialized. |
| * |
| * @throws IOException if the buffer pool is not initialized |
| */ |
| private void checkInitialize() throws IOException { |
| if (!this.isInitialize.get()) { |
| throw new IOException( |
| "The buffer pool has not been initialized yet"); |
| } |
| } |
| |
| /** |
| * Obtain a buffer from this buffer pool through the method. |
| * |
| * @param bufferSize expected buffer size to get |
| * @return a buffer wrapper that satisfies the bufferSize. |
| * @throws IOException if the buffer pool not initialized, |
| * or the bufferSize parameter is not within |
| * the range[1MB to the single buffer size] |
| */ |
| public ByteBufferWrapper getBuffer(int bufferSize) throws IOException { |
| this.checkInitialize(); |
| if (bufferSize > 0 && bufferSize <= this.singleBufferSize) { |
| ByteBufferWrapper byteBufferWrapper = this.getByteBuffer(); |
| if (null == byteBufferWrapper) { |
| // Use a disk buffer when the memory buffer is not enough |
| byteBufferWrapper = this.getMappedBuffer(); |
| } |
| return byteBufferWrapper; |
| } else { |
| String exceptionMsg = String.format( |
| "Parameter buffer size out of range: 1048576 to %d", |
| this.singleBufferSize |
| ); |
| throw new IOException(exceptionMsg); |
| } |
| } |
| |
| /** |
| * Get a ByteBufferWrapper from the buffer pool. |
| * |
| * @return a new byte buffer wrapper |
| * @throws IOException if the buffer pool is not initialized |
| */ |
| private ByteBufferWrapper getByteBuffer() throws IOException { |
| this.checkInitialize(); |
| ByteBuffer buffer = this.bufferPool.poll(); |
| return buffer == null ? null : new ByteBufferWrapper(buffer); |
| } |
| |
| /** |
| * Get a mapped buffer from the buffer pool. |
| * |
| * @return a new mapped buffer |
| * @throws IOException If the buffer pool is not initialized. |
| * or some I/O error occurs |
| */ |
| private ByteBufferWrapper getMappedBuffer() throws IOException { |
| this.checkInitialize(); |
| File tmpFile = File.createTempFile(Constants.BLOCK_TMP_FILE_PREFIX, |
| Constants.BLOCK_TMP_FILE_SUFFIX, this.diskBufferDir); |
| tmpFile.deleteOnExit(); |
| RandomAccessFile raf = new RandomAccessFile(tmpFile, "rw"); |
| raf.setLength(this.singleBufferSize); |
| MappedByteBuffer buf = raf.getChannel().map( |
| FileChannel.MapMode.READ_WRITE, 0, this.singleBufferSize); |
| return new ByteBufferWrapper(buf, raf, tmpFile); |
| } |
| |
| /** |
| * return the byte buffer wrapper to the buffer pool. |
| * |
| * @param byteBufferWrapper the byte buffer wrapper getting from the pool |
| * @throws InterruptedException if interrupted while waiting |
| * @throws IOException some io error occurs |
| */ |
| public void returnBuffer(ByteBufferWrapper byteBufferWrapper) |
| throws InterruptedException, IOException { |
| if (null == this.bufferPool || null == byteBufferWrapper) { |
| return; |
| } |
| |
| if (byteBufferWrapper.isDiskBuffer()) { |
| byteBufferWrapper.close(); |
| } else { |
| ByteBuffer byteBuffer = byteBufferWrapper.getByteBuffer(); |
| if (null != byteBuffer) { |
| byteBuffer.clear(); |
| LOG.debug("Return the buffer to the buffer pool."); |
| if (!this.bufferPool.offer(byteBuffer)) { |
| LOG.error("Return the buffer to buffer pool failed."); |
| } |
| } |
| } |
| } |
| } |