blob: ca873a67b0a2f82b00d5119b36cc5ef52671c7c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.net.telnet;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
final class TelnetInputStream extends BufferedInputStream implements Runnable
{
/** End of file has been reached */
private static final int EOF = -1;
/** Read would block */
private static final int WOULD_BLOCK = -2;
// TODO should these be private enums?
static final int STATE_DATA = 0, STATE_IAC = 1, STATE_WILL = 2,
STATE_WONT = 3, STATE_DO = 4, STATE_DONT = 5,
STATE_SB = 6, STATE_SE = 7, STATE_CR = 8, STATE_IAC_SB = 9;
private boolean hasReachedEOF; // @GuardedBy("__queue")
private volatile boolean isClosed;
private boolean readIsWaiting;
private int receiveState, queueHead, queueTail, bytesAvailable;
private final int[] queue;
private final TelnetClient client;
private final Thread thread;
private IOException ioException;
/* TERMINAL-TYPE option (start)*/
private final int suboption[];
private int suboptionCount;
/* TERMINAL-TYPE option (end)*/
private volatile boolean threaded;
TelnetInputStream(final InputStream input, final TelnetClient client) {
this(input, client, true);
}
TelnetInputStream(final InputStream input, final TelnetClient client,
final boolean readerThread)
{
super(input);
this.client = client;
this.receiveState = STATE_DATA;
this.isClosed = true;
this.hasReachedEOF = false;
// Make it 2049, because when full, one slot will go unused, and we
// want a 2048 byte buffer just to have a round number (base 2 that is)
this.queue = new int[2049];
this.queueHead = 0;
this.queueTail = 0;
this.suboption = new int[client.maxSubnegotiationLength];
this.bytesAvailable = 0;
this.ioException = null;
this.readIsWaiting = false;
this.threaded = false;
if(readerThread) {
this.thread = new Thread(this);
} else {
this.thread = null;
}
}
@Override
public int available() throws IOException
{
// Critical section because run() may change __bytesAvailable
synchronized (queue)
{
if (threaded) { // Must not call super.available when running threaded: NET-466
return bytesAvailable;
}
return bytesAvailable + super.available();
}
}
// Cannot be synchronized. Will cause deadlock if run() is blocked
// in read because BufferedInputStream read() is synchronized.
@Override
public void close() throws IOException
{
// Completely disregard the fact thread may still be running.
// We can't afford to block on this close by waiting for
// thread to terminate because few if any JVM's will actually
// interrupt a system read() from the interrupt() method.
super.close();
synchronized (queue)
{
hasReachedEOF = true;
isClosed = true;
if (thread != null && thread.isAlive())
{
thread.interrupt();
}
queue.notifyAll();
}
}
/** Returns false. Mark is not supported. */
@Override
public boolean markSupported()
{
return false;
}
// synchronized(__client) critical sections are to protect against
// TelnetOutputStream writing through the telnet client at same time
// as a processDo/Will/etc. command invoked from TelnetInputStream
// tries to write. Returns true if buffer was previously empty.
private boolean processChar(final int ch) throws InterruptedException
{
// Critical section because we're altering __bytesAvailable,
// __queueTail, and the contents of _queue.
final boolean bufferWasEmpty;
synchronized (queue)
{
bufferWasEmpty = bytesAvailable == 0;
while (bytesAvailable >= queue.length - 1)
{
// The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
// will consume some data soon!
if(!threaded) {
// We've been asked to add another character to the queue, but it is already full and there's
// no other thread to drain it. This should not have happened!
throw new IllegalStateException("Queue is full! Cannot process another character.");
}
queue.notify();
try
{
queue.wait();
}
catch (final InterruptedException e)
{
throw e;
}
}
// Need to do this in case we're not full, but block on a read
if (readIsWaiting && threaded)
{
queue.notify();
}
queue[queueTail] = ch;
++bytesAvailable;
if (++queueTail >= queue.length) {
queueTail = 0;
}
}
return bufferWasEmpty;
}
@Override
public int read() throws IOException
{
// Critical section because we're altering __bytesAvailable,
// __queueHead, and the contents of _queue in addition to
// testing value of __hasReachedEOF.
synchronized (queue)
{
while (true)
{
if (ioException != null)
{
final IOException e;
e = ioException;
ioException = null;
throw e;
}
if (bytesAvailable == 0)
{
// Return EOF if at end of file
if (hasReachedEOF) {
return EOF;
}
// Otherwise, we have to wait for queue to get something
if(threaded)
{
queue.notify();
try
{
readIsWaiting = true;
queue.wait();
readIsWaiting = false;
}
catch (final InterruptedException e)
{
throw new InterruptedIOException("Fatal thread interruption during read.");
}
}
else
{
//__alreadyread = false;
readIsWaiting = true;
int ch;
boolean mayBlock = true; // block on the first read only
do
{
try
{
if ((ch = read(mayBlock)) < 0) { // must be EOF
if(ch != WOULD_BLOCK) {
return ch;
}
}
}
catch (final InterruptedIOException e)
{
synchronized (queue)
{
ioException = e;
queue.notifyAll();
try
{
queue.wait(100);
}
catch (final InterruptedException interrupted)
{
// Ignored
}
}
return EOF;
}
try
{
if(ch != WOULD_BLOCK)
{
processChar(ch);
}
}
catch (final InterruptedException e)
{
if (isClosed) {
return EOF;
}
}
// Reads should not block on subsequent iterations. Potentially, this could happen if the
// remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
mayBlock = false;
}
// Continue reading as long as there is data available and the queue is not full.
while (super.available() > 0 && bytesAvailable < queue.length - 1);
readIsWaiting = false;
}
continue;
}
final int ch;
ch = queue[queueHead];
if (++queueHead >= queue.length) {
queueHead = 0;
}
--bytesAvailable;
// Need to explicitly notify() so available() works properly
if(bytesAvailable == 0 && threaded) {
queue.notify();
}
return ch;
}
}
}
// synchronized(__client) critical sections are to protect against
// TelnetOutputStream writing through the telnet client at same time
// as a processDo/Will/etc. command invoked from TelnetInputStream
// tries to write.
/**
* Get the next byte of data.
* IAC commands are processed internally and do not return data.
*
* @param mayBlock true if method is allowed to block
* @return the next byte of data,
* or -1 (EOF) if end of stread reached,
* or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
*/
private int read(final boolean mayBlock) throws IOException
{
int ch;
while (true)
{
// If there is no more data AND we were told not to block,
// just return WOULD_BLOCK (-2). (More efficient than exception.)
if(!mayBlock && super.available() == 0) {
return WOULD_BLOCK;
}
// Otherwise, exit only when we reach end of stream.
if ((ch = super.read()) < 0) {
return EOF;
}
ch = ch & 0xff;
/* Code Section added for supporting AYT (start)*/
synchronized (client)
{
client.processAYTResponse();
}
/* Code Section added for supporting AYT (end)*/
/* Code Section added for supporting spystreams (start)*/
client.spyRead(ch);
/* Code Section added for supporting spystreams (end)*/
switch (receiveState)
{
case STATE_CR:
if (ch == '\0')
{
// Strip null
continue;
}
// How do we handle newline after cr?
// else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
// Handle as normal data by falling through to _STATE_DATA case
//$FALL-THROUGH$
case STATE_DATA:
if (ch == TelnetCommand.IAC)
{
receiveState = STATE_IAC;
continue;
}
if (ch == '\r')
{
synchronized (client)
{
if (client.requestedDont(TelnetOption.BINARY)) {
receiveState = STATE_CR;
} else {
receiveState = STATE_DATA;
}
}
} else {
receiveState = STATE_DATA;
}
break;
case STATE_IAC:
switch (ch)
{
case TelnetCommand.WILL:
receiveState = STATE_WILL;
continue;
case TelnetCommand.WONT:
receiveState = STATE_WONT;
continue;
case TelnetCommand.DO:
receiveState = STATE_DO;
continue;
case TelnetCommand.DONT:
receiveState = STATE_DONT;
continue;
/* TERMINAL-TYPE option (start)*/
case TelnetCommand.SB:
suboptionCount = 0;
receiveState = STATE_SB;
continue;
/* TERMINAL-TYPE option (end)*/
case TelnetCommand.IAC:
receiveState = STATE_DATA;
break; // exit to enclosing switch to return IAC from read
case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
receiveState = STATE_DATA;
continue;
default:
receiveState = STATE_DATA;
client.processCommand(ch); // Notify the user
continue; // move on the next char
}
break; // exit and return from read
case STATE_WILL:
synchronized (client)
{
client.processWill(ch);
client.flushOutputStream();
}
receiveState = STATE_DATA;
continue;
case STATE_WONT:
synchronized (client)
{
client.processWont(ch);
client.flushOutputStream();
}
receiveState = STATE_DATA;
continue;
case STATE_DO:
synchronized (client)
{
client.processDo(ch);
client.flushOutputStream();
}
receiveState = STATE_DATA;
continue;
case STATE_DONT:
synchronized (client)
{
client.processDont(ch);
client.flushOutputStream();
}
receiveState = STATE_DATA;
continue;
/* TERMINAL-TYPE option (start)*/
case STATE_SB:
switch (ch)
{
case TelnetCommand.IAC:
receiveState = STATE_IAC_SB;
continue;
default:
// store suboption char
if (suboptionCount < suboption.length) {
suboption[suboptionCount++] = ch;
}
break;
}
receiveState = STATE_SB;
continue;
case STATE_IAC_SB: // IAC received during SB phase
switch (ch)
{
case TelnetCommand.SE:
synchronized (client)
{
client.processSuboption(suboption, suboptionCount);
client.flushOutputStream();
}
receiveState = STATE_DATA;
continue;
case TelnetCommand.IAC: // De-dup the duplicated IAC
if (suboptionCount < suboption.length) {
suboption[suboptionCount++] = ch;
}
break;
default: // unexpected byte! ignore it
break;
}
receiveState = STATE_SB;
continue;
/* TERMINAL-TYPE option (end)*/
}
break;
}
return ch;
}
/**
* Reads the next number of bytes from the stream into an array and
* returns the number of bytes read. Returns -1 if the end of the
* stream has been reached.
* <p>
* @param buffer The byte array in which to store the data.
* @return The number of bytes read. Returns -1 if the
* end of the message has been reached.
* @throws IOException If an error occurs in reading the underlying
* stream.
*/
@Override
public int read(final byte buffer[]) throws IOException
{
return read(buffer, 0, buffer.length);
}
/**
* Reads the next number of bytes from the stream into an array and returns
* the number of bytes read. Returns -1 if the end of the
* message has been reached. The characters are stored in the array
* starting from the given offset and up to the length specified.
* <p>
* @param buffer The byte array in which to store the data.
* @param offset The offset into the array at which to start storing data.
* @param length The number of bytes to read.
* @return The number of bytes read. Returns -1 if the
* end of the stream has been reached.
* @throws IOException If an error occurs while reading the underlying
* stream.
*/
@Override
public int read(final byte buffer[], int offset, int length) throws IOException
{
int ch;
final int off;
if (length < 1) {
return 0;
}
// Critical section because run() may change __bytesAvailable
synchronized (queue)
{
if (length > bytesAvailable) {
length = bytesAvailable;
}
}
if ((ch = read()) == EOF) {
return EOF;
}
off = offset;
do
{
buffer[offset++] = (byte)ch;
}
while (--length > 0 && (ch = read()) != EOF);
//__client._spyRead(buffer, off, offset - off);
return offset - off;
}
@Override
public void run()
{
int ch;
try
{
_outerLoop:
while (!isClosed)
{
try
{
if ((ch = read(true)) < 0) {
break;
}
}
catch (final InterruptedIOException e)
{
synchronized (queue)
{
ioException = e;
queue.notifyAll();
try
{
queue.wait(100);
}
catch (final InterruptedException interrupted)
{
if (isClosed) {
break _outerLoop;
}
}
continue;
}
} catch(final RuntimeException re) {
// We treat any runtime exceptions as though the
// stream has been closed. We close the
// underlying stream just to be sure.
super.close();
// Breaking the loop has the effect of setting
// the state to closed at the end of the method.
break _outerLoop;
}
// Process new character
boolean notify = false;
try
{
notify = processChar(ch);
}
catch (final InterruptedException e)
{
if (isClosed) {
break _outerLoop;
}
}
// Notify input listener if buffer was previously empty
if (notify) {
client.notifyInputListener();
}
}
}
catch (final IOException ioe)
{
synchronized (queue)
{
ioException = ioe;
}
client.notifyInputListener();
}
synchronized (queue)
{
isClosed = true; // Possibly redundant
hasReachedEOF = true;
queue.notify();
}
threaded = false;
}
void start()
{
if(thread == null) {
return;
}
int priority;
isClosed = false;
// TODO remove this
// Need to set a higher priority in case JVM does not use pre-emptive
// threads. This should prevent scheduler induced deadlock (rather than
// deadlock caused by a bug in this code).
priority = Thread.currentThread().getPriority() + 1;
if (priority > Thread.MAX_PRIORITY) {
priority = Thread.MAX_PRIORITY;
}
thread.setPriority(priority);
thread.setDaemon(true);
thread.start();
threaded = true; // tell _processChar that we are running threaded
}
}