| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.olingo.server.core.serializer.utils; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.Queue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| /** |
| * Circular stream buffer to write/read into/from one single buffer. |
| * With support of {@link InputStream} and {@link OutputStream} access to buffered data. |
| * |
| * |
| */ |
| public class CircleStreamBuffer { |
| |
| private static final int NEW_BUFFER_RESIZE_FACTOR = 2; |
| private static final int READ_EOF = -1; |
| private static final int DEFAULT_CAPACITY = 8192; |
| private static final int MAX_CAPACITY = DEFAULT_CAPACITY * 32; |
| |
| private int currentAllocateCapacity = DEFAULT_CAPACITY; |
| |
| private boolean writeMode = true; |
| private boolean writeClosed = false; |
| private boolean readClosed = false; |
| |
| private Queue<ByteBuffer> bufferQueue = new LinkedBlockingQueue<ByteBuffer>(); |
| private ByteBuffer currentWriteBuffer; |
| |
| private InternalInputStream inStream; |
| private InternalOutputStream outStream; |
| |
| /** |
| * Creates a {@link CircleStreamBuffer} with default buffer size. |
| */ |
| public CircleStreamBuffer() { |
| this(DEFAULT_CAPACITY); |
| } |
| |
| /** |
| * Create a {@link CircleStreamBuffer} with given buffer size in bytes. |
| * |
| * @param bufferSize |
| */ |
| public CircleStreamBuffer(final int bufferSize) { |
| currentAllocateCapacity = bufferSize; |
| createNewWriteBuffer(); |
| inStream = new InternalInputStream(this); |
| outStream = new InternalOutputStream(this); |
| } |
| |
| /** |
| * Get {@link InputStream} for data read access. |
| * |
| * @return the stream |
| */ |
| public InputStream getInputStream() { |
| return inStream; |
| } |
| |
| /** |
| * Get {@link OutputStream} for write data. |
| * |
| * @return the stream |
| */ |
| public OutputStream getOutputStream() { |
| return outStream; |
| } |
| |
| // ############################################# |
| // # |
| // # Common parts |
| // # |
| // ############################################# |
| |
| /** |
| * Closes the write (input) part of the {@link CircleStreamBuffer}. |
| * After this call the buffer can only be read out. |
| */ |
| public void closeWrite() { |
| writeClosed = true; |
| } |
| |
| /** |
| * Closes the read (output) part of the {@link CircleStreamBuffer}. |
| * After this call it is possible to write into the buffer (but can never be read out). |
| */ |
| public void closeRead() { |
| readClosed = true; |
| // clear references to byte buffers |
| ByteBuffer buffer = bufferQueue.poll(); |
| while (buffer != null) { |
| buffer.clear(); |
| buffer = bufferQueue.poll(); |
| } |
| } |
| |
| /** |
| * Closes write and read part (and hence the complete buffer). |
| */ |
| public void close() { |
| closeWrite(); |
| closeRead(); |
| } |
| |
| private int remaining() throws IOException { |
| if (writeMode) { |
| return currentWriteBuffer.remaining(); |
| } else { |
| ByteBuffer toRead = getReadBuffer(); |
| if (toRead == null) { |
| return 0; |
| } |
| return toRead.remaining(); |
| } |
| } |
| |
| // ############################################# |
| // # |
| // # Reading parts |
| // # |
| // ############################################# |
| |
| private ByteBuffer getReadBuffer() throws IOException { |
| if (readClosed) { |
| throw new IOException("Tried to read from closed stream."); |
| } |
| |
| boolean next = false; |
| ByteBuffer tmp = null; |
| if (writeMode) { |
| writeMode = false; |
| next = true; |
| } else { |
| tmp = bufferQueue.peek(); |
| if (tmp != null && !tmp.hasRemaining()) { |
| tmp = bufferQueue.poll(); |
| next = true; |
| } |
| } |
| |
| if (next) { |
| tmp = bufferQueue.peek(); |
| if (tmp != null) { |
| tmp.flip(); |
| } |
| tmp = getReadBuffer(); |
| } |
| |
| return tmp; |
| } |
| |
| private int read(final byte[] b, final int off, final int len) throws IOException { |
| ByteBuffer readBuffer = getReadBuffer(); |
| if (readBuffer == null) { |
| return READ_EOF; |
| } |
| |
| int toReadLength = readBuffer.remaining(); |
| if (len < toReadLength) { |
| toReadLength = len; |
| } |
| readBuffer.get(b, off, toReadLength); |
| return toReadLength; |
| } |
| |
| private int read() throws IOException { |
| ByteBuffer readBuffer = getReadBuffer(); |
| if (readBuffer == null) { |
| return READ_EOF; |
| } |
| |
| return readBuffer.get(); |
| } |
| |
| // ############################################# |
| // # |
| // # Writing parts |
| // # |
| // ############################################# |
| |
| private void write(final byte[] data, final int off, final int len) throws IOException { |
| ByteBuffer writeBuffer = getWriteBuffer(len); |
| writeBuffer.put(data, off, len); |
| } |
| |
| private ByteBuffer getWriteBuffer(final int size) throws IOException { |
| if (writeClosed) { |
| throw new IOException("Tried to write into closed stream."); |
| } |
| |
| if (writeMode) { |
| if (remaining() < size) { |
| createNewWriteBuffer(size); |
| } |
| } else { |
| writeMode = true; |
| createNewWriteBuffer(); |
| } |
| |
| return currentWriteBuffer; |
| } |
| |
| private void write(final int b) throws IOException { |
| ByteBuffer writeBuffer = getWriteBuffer(1); |
| writeBuffer.put((byte) b); |
| } |
| |
| private void createNewWriteBuffer() { |
| createNewWriteBuffer(currentAllocateCapacity); |
| } |
| |
| /** |
| * Creates a new buffer (per {@link #allocateBuffer(int)}) with the requested capacity as minimum capacity, add the |
| * new allocated |
| * buffer to the {@link #bufferQueue} and set it as {@link #currentWriteBuffer}. |
| * |
| * @param requestedCapacity minimum capacity for new allocated buffer |
| */ |
| private void createNewWriteBuffer(final int requestedCapacity) { |
| ByteBuffer b = allocateBuffer(requestedCapacity); |
| bufferQueue.add(b); |
| currentWriteBuffer = b; |
| } |
| |
| /** |
| * |
| * @param requestedCapacity |
| * @return the buffer |
| */ |
| private ByteBuffer allocateBuffer(final int requestedCapacity) { |
| int allocateCapacity = requestedCapacity; |
| if (allocateCapacity < currentAllocateCapacity) { |
| allocateCapacity = currentAllocateCapacity * NEW_BUFFER_RESIZE_FACTOR; |
| } |
| if (allocateCapacity > MAX_CAPACITY) { |
| allocateCapacity = MAX_CAPACITY; |
| } |
| // update current |
| currentAllocateCapacity = allocateCapacity; |
| return ByteBuffer.allocate(allocateCapacity); |
| } |
| |
| // ############################################# |
| // # |
| // # Inner classes (streams) |
| // # |
| // ############################################# |
| |
| /** |
| * |
| */ |
| private static class InternalInputStream extends InputStream { |
| |
| private final CircleStreamBuffer inBuffer; |
| |
| public InternalInputStream(final CircleStreamBuffer csBuffer) { |
| inBuffer = csBuffer; |
| } |
| |
| @Override |
| public int available() throws IOException { |
| return inBuffer.remaining(); |
| } |
| |
| @Override |
| public int read() throws IOException { |
| return inBuffer.read(); |
| } |
| |
| @Override |
| public int read(final byte[] b, final int off, final int len) throws IOException { |
| return inBuffer.read(b, off, len); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| inBuffer.closeRead(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class InternalOutputStream extends OutputStream { |
| private final CircleStreamBuffer outBuffer; |
| |
| public InternalOutputStream(final CircleStreamBuffer csBuffer) { |
| outBuffer = csBuffer; |
| } |
| |
| @Override |
| public void write(final int b) throws IOException { |
| outBuffer.write(b); |
| } |
| |
| @Override |
| public void write(final byte[] b, final int off, final int len) throws IOException { |
| outBuffer.write(b, off, len); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| outBuffer.closeWrite(); |
| } |
| } |
| } |