| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.synapse.transport.passthru; |
| |
| import org.apache.http.nio.IOControl; |
| import org.apache.http.nio.ContentDecoder; |
| import org.apache.http.nio.ContentEncoder; |
| import org.apache.synapse.transport.passthru.config.BaseConfiguration; |
| import org.apache.synapse.transport.passthru.util.ControlledByteBuffer; |
| |
| import java.io.*; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * This is a buffer shared by both producers and consumers. |
| */ |
| public class Pipe { |
| |
| /** IOControl of the reader */ |
| private IOControl producerIoControl; |
| |
| /** IOControl of the consumer */ |
| private IOControl consumerIoControl; |
| |
| /** Fixed size buffer to read and write data */ |
| private ControlledByteBuffer buffer; |
| |
| private ControlledByteBuffer outputBuffer; |
| |
| private boolean producerCompleted = false; |
| |
| /** Lock to synchronize the producers and consumers */ |
| private Lock lock = new ReentrantLock(); |
| |
| private Condition readCondition = lock.newCondition(); |
| private Condition writeCondition = lock.newCondition(); |
| |
| /** Name to identify the buffer */ |
| private String name = "Buffer"; |
| |
| private boolean consumerError = false; |
| |
| private boolean producerError = false; |
| |
| private BaseConfiguration baseConfig; |
| |
| private boolean serializationComplete = false; |
| |
| private boolean rawSerializationComplete = false; |
| |
| private boolean hasHttpProducer = true; |
| |
| private ByteBufferInputStream inputStream; |
| private ByteBufferOutputStream outputStream; |
| |
| public Pipe(IOControl producerIoControl, ControlledByteBuffer buffer, |
| String name, BaseConfiguration baseConfig) { |
| this.producerIoControl = producerIoControl; |
| this.buffer = buffer; |
| this.name += "_" + name; |
| this.baseConfig = baseConfig; |
| } |
| |
| public Pipe(ControlledByteBuffer buffer, String name, BaseConfiguration baseConfig) { |
| this.buffer = buffer; |
| this.name += "_" + name; |
| this.baseConfig = baseConfig; |
| this.hasHttpProducer = false; |
| } |
| |
| /** |
| * Set the consumers IOControl |
| * @param consumerIoControl IOControl of the consumer |
| */ |
| public void attachConsumer(IOControl consumerIoControl) { |
| this.consumerIoControl = consumerIoControl; |
| } |
| |
| /** |
| * Consume the data from the buffer. Before calling this method attachConsumer |
| * method must be called with a valid IOControl. |
| * |
| * @param encoder encoder used to write the data means there will not be any data |
| * written in to this buffer |
| * @return number of bytes written (consumed) |
| * @throws IOException if an error occurred while consuming data |
| */ |
| public int consume(final ContentEncoder encoder) throws IOException { |
| if (consumerIoControl == null) { |
| throw new IllegalStateException("Consumer cannot be null when calling consume"); |
| } |
| |
| if (hasHttpProducer && producerIoControl == null) { |
| throw new IllegalStateException("Producer cannot be null when calling consume"); |
| } |
| |
| lock.lock(); |
| ControlledByteBuffer consumerBuffer; |
| if (outputBuffer != null) { |
| consumerBuffer = outputBuffer; |
| } else { |
| consumerBuffer = buffer; |
| } |
| try { |
| // if producer at error we have to stop the encoding and return immediately |
| if (producerError) { |
| encoder.complete(); |
| return -1; |
| } |
| |
| setOutputMode(consumerBuffer); |
| int bytesWritten = encoder.write(consumerBuffer.getByteBuffer()); |
| setInputMode(consumerBuffer); |
| |
| if (consumerBuffer.position() == 0) { |
| if (outputBuffer == null) { |
| if (producerCompleted) { |
| encoder.complete(); |
| } else { |
| // buffer is empty. Wait until the producer fills up |
| // the buffer |
| consumerIoControl.suspendOutput(); |
| } |
| } else if (serializationComplete || rawSerializationComplete) { |
| encoder.complete(); |
| } |
| } |
| |
| if (bytesWritten > 0) { |
| if (!encoder.isCompleted() && !producerCompleted && hasHttpProducer) { |
| producerIoControl.requestInput(); |
| } |
| writeCondition.signalAll(); |
| } |
| |
| return bytesWritten; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Produce data in to the buffer. |
| * |
| * @param decoder decoder to read bytes from the underlying stream |
| * @return bytes read (consumed) |
| * @throws IOException if an error occurs while reading data |
| */ |
| public int produce(final ContentDecoder decoder) throws IOException { |
| if (producerIoControl == null) { |
| throw new IllegalStateException("Producer cannot be null when calling produce"); |
| } |
| |
| lock.lock(); |
| try { |
| setInputMode(buffer); |
| int bytesRead = decoder.read(buffer.getByteBuffer()); |
| |
| // if consumer is at error we have to let the producer complete |
| if (consumerError) { |
| buffer.clear(); |
| } |
| |
| if (!buffer.hasRemaining()) { |
| // Input buffer is full. Suspend client input |
| // until the origin handler frees up some space in the buffer |
| producerIoControl.suspendInput(); |
| } |
| |
| // If there is some content in the input buffer make sure consumer output is active |
| if (buffer.position() > 0 || decoder.isCompleted()) { |
| if (consumerIoControl != null) { |
| consumerIoControl.requestOutput(); |
| } |
| readCondition.signalAll(); |
| } |
| |
| if (decoder.isCompleted()) { |
| producerCompleted = true; |
| } |
| return bytesRead; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| |
| public void consumerError() { |
| lock.lock(); |
| try { |
| this.consumerError = true; |
| writeCondition.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void producerError() { |
| lock.lock(); |
| try { |
| this.producerError = true; |
| readCondition.signalAll(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Creates an InputStream object on the underlying ByteBuffer. The returned |
| * InputStream can be used to read bytes from the underlying buffer which |
| * is being filled by the producer. |
| * |
| * @return An InputStream object |
| */ |
| public synchronized InputStream getInputStream() { |
| if (inputStream == null) { |
| inputStream = new ByteBufferInputStream(); |
| } |
| return inputStream; |
| } |
| |
| /** |
| * Creates a separate ByteBuffer for the output data and returns an OutputStream |
| * on top of it. |
| * |
| * @return An OutputStream object |
| */ |
| public synchronized OutputStream getOutputStream() { |
| if (outputStream == null) { |
| outputBuffer = baseConfig.getBufferFactory().getBuffer(); |
| outputStream = new ByteBufferOutputStream(); |
| } |
| return outputStream; |
| } |
| |
| public synchronized void setSerializationComplete(boolean serializationComplete) { |
| if (!this.serializationComplete) { |
| this.serializationComplete = serializationComplete; |
| if (consumerIoControl != null && hasData(outputBuffer)) { |
| consumerIoControl.requestOutput(); |
| } |
| } |
| } |
| |
| public synchronized void setSerializationCompleteWithoutData(boolean serializationComplete) { |
| if (!this.serializationComplete) { |
| this.serializationComplete = serializationComplete; |
| consumerIoControl.requestOutput(); |
| } |
| } |
| |
| public void setRawSerializationComplete(boolean rawSerializationComplete) { |
| this.rawSerializationComplete = rawSerializationComplete; |
| } |
| |
| public ControlledByteBuffer getBuffer() { |
| return buffer; |
| } |
| |
| public synchronized boolean isSerializationComplete(){ |
| return serializationComplete; |
| } |
| |
| private void setInputMode(ControlledByteBuffer buffer) { |
| if (buffer.setInputMode()) { |
| if (buffer.hasRemaining()) { |
| buffer.compact(); |
| } else { |
| buffer.clear(); |
| } |
| } |
| } |
| |
| private void setOutputMode(ControlledByteBuffer buffer) { |
| if (buffer.setOutputMode()) { |
| buffer.flip(); |
| } |
| } |
| |
| private boolean hasData(ControlledByteBuffer buffer) { |
| lock.lock(); |
| try { |
| setOutputMode(buffer); |
| return buffer.hasRemaining(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * This method returns whether buffer consumption is required or not. |
| * |
| * @return boolean returns whether buffer consumption is required or not |
| * @throws IOException when there is an error |
| */ |
| public boolean isConsumeRequired() throws IOException { |
| lock.lock(); |
| boolean isInputMode = buffer.isInputMode(); |
| try { |
| if (isInputMode) { |
| setOutputMode(buffer); |
| } |
| int readRemaining = buffer.remaining(); |
| int readPosition = buffer.position(); |
| setInputMode(buffer); |
| int writePosition = buffer.position(); |
| int writeRemaining = buffer.remaining(); |
| // in this method we will return true when the buffer is full when reading didn't happened : writePosition == buffer.capacity() && readPosition == 0 |
| // we will return true if we have consumed the message partially and when there is remaining to read |
| return (readRemaining == 0 && writeRemaining == readPosition) || (writePosition == buffer.capacity() && readPosition == 0); |
| } finally { |
| if (isInputMode) { |
| setInputMode(buffer); |
| } |
| lock.unlock(); |
| } |
| } |
| |
| private class ByteBufferInputStream extends InputStream { |
| |
| @Override |
| public int read() throws IOException { |
| lock.lock(); |
| try { |
| if (!hasData(buffer)) { |
| waitForData(); |
| if (producerError) { |
| return -1; |
| } |
| } |
| if (isEndOfStream()) { |
| return -1; |
| } |
| return buffer.get() & 0xff; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public int read(byte[] b, int off, int len) throws IOException { |
| if (b == null) { |
| return 0; |
| } |
| |
| lock.lock(); |
| try { |
| if (!hasData(buffer)) { |
| waitForData(); |
| } |
| if (isEndOfStream()) { |
| return -1; |
| } |
| setOutputMode(buffer); |
| int chunk = len; |
| if (chunk > buffer.remaining()) { |
| chunk = buffer.remaining(); |
| } |
| buffer.get(b, off, chunk); |
| return chunk; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void waitForData() throws IOException { |
| lock.lock(); |
| try { |
| try { |
| while (!hasData(buffer) && !producerCompleted) { |
| if (producerError) { |
| break; |
| } |
| producerIoControl.requestInput(); |
| readCondition.await(); |
| } |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while waiting for data"); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private boolean isEndOfStream() { |
| return !hasData(buffer) && producerCompleted; |
| } |
| } |
| |
| private class ByteBufferOutputStream extends OutputStream { |
| |
| @Override |
| public void write(int b) throws IOException { |
| lock.lock(); |
| try { |
| setInputMode(outputBuffer); |
| if (!outputBuffer.hasRemaining()) { |
| flushContent(); |
| setInputMode(outputBuffer); |
| } |
| outputBuffer.put((byte) b); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| public void write(final byte[] b, int off, int len) throws IOException { |
| if (b == null) { |
| return; |
| } |
| lock.lock(); |
| try { |
| setInputMode(outputBuffer); |
| int remaining = len; |
| while (remaining > 0) { |
| if (!outputBuffer.hasRemaining()) { |
| flushContent(); |
| if (consumerError) { |
| buffer.clear(); |
| break; |
| } |
| setInputMode(outputBuffer); |
| } |
| int chunk = Math.min(remaining, outputBuffer.remaining()); |
| outputBuffer.put(b, off, chunk); |
| remaining -= chunk; |
| off += chunk; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void flushContent() throws IOException { |
| lock.lock(); |
| |
| if(rawSerializationComplete){ |
| return; |
| } |
| |
| try { |
| try { |
| while (hasData(outputBuffer)) { |
| if(consumerError) { |
| break; |
| } |
| if (consumerIoControl != null && writeCondition != null) { |
| consumerIoControl.requestOutput(); |
| writeCondition.await(); |
| } |
| } |
| |
| } catch (InterruptedException ex) { |
| throw new IOException("Interrupted while flushing the content buffer"); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| } |
| |