blob: 15568766817452da36ff90472b5dc1bfec1e81e7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.amqp_1_0.transport;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.SASLFrame;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.security.SaslChallenge;
import org.apache.qpid.amqp_1_0.type.security.SaslCode;
import org.apache.qpid.amqp_1_0.type.security.SaslInit;
import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms;
import org.apache.qpid.amqp_1_0.type.security.SaslOutcome;
import org.apache.qpid.amqp_1_0.type.security.SaslResponse;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Source, ValueWriter.Registry.Source,
ErrorHandler, SASLEndpoint
{
private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
private static final Symbol SASL_PLAIN = Symbol.valueOf("PLAIN");
private static final Symbol SASL_ANONYMOUS = Symbol.valueOf("ANONYMOUS");
private static final Symbol SASL_EXTERNAL = Symbol.valueOf("EXTERNAL");
private final Container _container;
private Principal _user;
private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l);
private ConnectionState _state = ConnectionState.UNOPENED;
private short _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
private SessionEndpoint[] _sendingSessions;
// positioned by the *incoming* channel
private SessionEndpoint[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
private long _idleTimeout;
private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
.registerTransactionLayer()
.registerSecurityLayer();
private FrameOutputHandler<FrameBody> _frameOutputHandler;
private byte _majorVersion;
private byte _minorVersion;
private byte _revision;
private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT;
private String _password;
private boolean _requiresSASLClient;
private final boolean _requiresSASLServer;
private FrameOutputHandler<SaslFrameBody> _saslFrameOutput;
private boolean _saslComplete;
private UnsignedInteger _desiredMaxFrameSize = UnsignedInteger.valueOf(DEFAULT_MAX_FRAME);
private Runnable _onSaslCompleteTask;
private SaslServerProvider _saslServerProvider;
private SaslServer _saslServer;
private boolean _authenticated;
private String _remoteHostname;
private Error _remoteError;
private Map _properties;
private long _syncTimeout = DEFAULT_SYNC_TIMEOUT;
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
_container = container;
_saslServerProvider = cbs;
_requiresSASLClient = false;
_requiresSASLServer = cbs != null;
}
public ConnectionEndpoint(Container container, Principal user, String password)
{
_container = container;
_user = user;
_password = password;
_requiresSASLClient = user != null;
_requiresSASLServer = false;
}
public void setPrincipal(Principal user)
{
if(_user == null)
{
_user = user;
_requiresSASLClient = user != null;
}
}
public synchronized void open()
{
if (_requiresSASLClient)
{
synchronized (getLock())
{
while (!_saslComplete)
{
try
{
getLock().wait();
}
catch (InterruptedException e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
if (!_authenticated)
{
throw new RuntimeException("Could not connect - authentication error");
}
}
if (_state == ConnectionState.UNOPENED)
{
sendOpen(_channelMax, DEFAULT_MAX_FRAME);
_state = ConnectionState.AWAITING_OPEN;
}
}
public void setFrameOutputHandler(final FrameOutputHandler<FrameBody> frameOutputHandler)
{
_frameOutputHandler = frameOutputHandler;
}
public void setProperties(Map<Symbol,Object> properties)
{
_properties = properties;
}
public synchronized SessionEndpoint createSession(String name)
{
// todo assert connection state
short channel = getFirstFreeChannel();
if (channel != -1)
{
SessionEndpoint endpoint = new SessionEndpoint(this);
_sendingSessions[channel] = endpoint;
endpoint.setSendingChannel(channel);
Begin begin = new Begin();
begin.setNextOutgoingId(endpoint.getNextOutgoingId());
begin.setOutgoingWindow(endpoint.getOutgoingWindowSize());
begin.setIncomingWindow(endpoint.getIncomingWindowSize());
begin.setHandleMax(_handleMax);
send(channel, begin);
return endpoint;
}
else
{
// TODO - report error
return null;
}
}
public Container getContainer()
{
return _container;
}
public Principal getUser()
{
return _user;
}
public short getChannelMax()
{
return _channelMax;
}
public int getMaxFrameSize()
{
return _maxFrameSize;
}
public String getRemoteContainerId()
{
return _remoteContainerId;
}
private void sendOpen(final short channelMax, final int maxFrameSize)
{
Open open = new Open();
if(_receivingSessions == null)
{
_receivingSessions = new SessionEndpoint[channelMax+1];
_sendingSessions = new SessionEndpoint[channelMax+1];
}
if(channelMax < _channelMax)
{
_channelMax = channelMax;
}
open.setChannelMax(UnsignedShort.valueOf(channelMax));
open.setContainerId(_container.getId());
open.setMaxFrameSize(getDesiredMaxFrameSize());
open.setHostname(getRemoteHostname());
if(_properties != null)
{
open.setProperties(_properties);
}
send(CONNECTION_CONTROL_CHANNEL, open);
}
public UnsignedInteger getDesiredMaxFrameSize()
{
return _desiredMaxFrameSize;
}
public void setDesiredMaxFrameSize(UnsignedInteger size)
{
_desiredMaxFrameSize = size;
}
private void closeSender()
{
setClosedForOutput(true);
_frameOutputHandler.close();
}
short getFirstFreeChannel()
{
for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
return (short) i;
}
}
return -1;
}
private SessionEndpoint getSession(final short channel)
{
SessionEndpoint session = _receivingSessions[channel];
if (session == null)
{
Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
this.handleError(error);
}
return session;
}
public synchronized void receiveOpen(short channel, Open open)
{
_channelMax = open.getChannelMax() == null ? _channelMax
: open.getChannelMax().shortValue() < _channelMax
? open.getChannelMax().shortValue()
: _channelMax;
if(_receivingSessions == null)
{
_receivingSessions = new SessionEndpoint[_channelMax+1];
_sendingSessions = new SessionEndpoint[_channelMax+1];
}
UnsignedInteger remoteDesiredMaxFrameSize =
open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
_maxFrameSize = (remoteDesiredMaxFrameSize.compareTo(_desiredMaxFrameSize) < 0
? remoteDesiredMaxFrameSize
: _desiredMaxFrameSize).intValue();
_remoteContainerId = open.getContainerId();
if (open.getIdleTimeOut() != null)
{
_idleTimeout = open.getIdleTimeOut().longValue();
}
switch (_state)
{
case UNOPENED:
sendOpen(_channelMax, _maxFrameSize);
case AWAITING_OPEN:
_state = ConnectionState.OPEN;
default:
// TODO bad stuff (connection already open)
}
/*if(_state == ConnectionState.AWAITING_OPEN)
{
_state = ConnectionState.OPEN;
}
*/
notifyAll();
}
public synchronized void receiveClose(short channel, Close close)
{
setClosedForInput(true);
_connectionEventListener.closeReceived();
switch (_state)
{
case UNOPENED:
case AWAITING_OPEN:
Error error = new Error();
error.setCondition(ConnectionError.CONNECTION_FORCED);
error.setDescription("Connection close sent before connection was opened");
close(error);
break;
case OPEN:
_state = ConnectionState.CLOSE_RECEIVED;
sendClose(new Close());
_state = ConnectionState.CLOSED;
break;
case CLOSE_SENT:
_state = ConnectionState.CLOSED;
default:
}
_remoteError = close.getError();
notifyAll();
}
public synchronized void close(Error error)
{
Close close = new Close();
close.setError(error);
switch (_state)
{
case UNOPENED:
_state = ConnectionState.CLOSED;
break;
case AWAITING_OPEN:
case OPEN:
sendClose(close);
_state = ConnectionState.CLOSE_SENT;
case CLOSE_SENT:
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
// TODO Unknown state
}
}
public synchronized void inputClosed()
{
if (!_closedForInput)
{
_closedForInput = true;
switch(_state)
{
case UNOPENED:
case AWAITING_OPEN:
case CLOSE_SENT:
_state = ConnectionState.CLOSED;
case OPEN:
_state = ConnectionState.CLOSE_RECEIVED;
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
}
if(_receivingSessions != null)
{
for (int i = 0; i < _receivingSessions.length; i++)
{
if (_receivingSessions[i] != null)
{
_receivingSessions[i].end();
_receivingSessions[i] = null;
}
}
}
}
notifyAll();
}
private void sendClose(Close closeToSend)
{
send(CONNECTION_CONTROL_CHANNEL, closeToSend);
closeSender();
}
private synchronized void setClosedForInput(boolean closed)
{
_closedForInput = closed;
notifyAll();
}
public synchronized void receiveBegin(short channel, Begin begin)
{
short myChannelId;
if (begin.getRemoteChannel() != null)
{
myChannelId = begin.getRemoteChannel().shortValue();
SessionEndpoint endpoint;
try
{
endpoint = _sendingSessions[myChannelId];
}
catch (IndexOutOfBoundsException e)
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ begin.getRemoteChannel() + " which is outside the valid range of 0 to "
+ _channelMax + ".");
close(error);
return;
}
if (endpoint != null)
{
if (_receivingSessions[channel] == null)
{
_receivingSessions[channel] = endpoint;
endpoint.setReceivingChannel(channel);
endpoint.setNextIncomingId(begin.getNextOutgoingId());
endpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
if (endpoint.getState() == SessionState.END_SENT)
{
_sendingSessions[myChannelId] = null;
}
}
else
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
close(error);
}
}
else
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ begin.getRemoteChannel() + " which is not known as a begun session.");
close(error);
}
}
else // Peer requesting session creation
{
myChannelId = getFirstFreeChannel();
if (myChannelId == -1)
{
// close any half open channel
myChannelId = getFirstFreeChannel();
}
if (_receivingSessions[channel] == null)
{
SessionEndpoint endpoint = new SessionEndpoint(this, begin);
_receivingSessions[channel] = endpoint;
_sendingSessions[myChannelId] = endpoint;
Begin beginToSend = new Begin();
endpoint.setReceivingChannel(channel);
endpoint.setSendingChannel(myChannelId);
beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
beginToSend.setNextOutgoingId(endpoint.getNextOutgoingId());
beginToSend.setOutgoingWindow(endpoint.getOutgoingWindowSize());
beginToSend.setIncomingWindow(endpoint.getIncomingWindowSize());
send(myChannelId, beginToSend);
_connectionEventListener.remoteSessionCreation(endpoint);
}
else
{
final Error error = new Error();
error.setCondition(ConnectionError.FRAMING_ERROR);
error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
close(error);
}
}
}
public synchronized void receiveEnd(short channel, End end)
{
SessionEndpoint endpoint = _receivingSessions[channel];
if (endpoint != null)
{
_receivingSessions[channel] = null;
endpoint.receiveEnd(end);
}
else
{
// TODO error
}
}
public synchronized void sendEnd(short channel, End end, boolean remove)
{
send(channel, end);
if (remove)
{
_sendingSessions[channel] = null;
}
}
public synchronized void receiveAttach(short channel, Attach attach)
{
SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveAttach(attach);
}
}
public synchronized void receiveDetach(short channel, Detach detach)
{
SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveDetach(detach);
}
}
public synchronized void receiveTransfer(short channel, Transfer transfer)
{
SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveTransfer(transfer);
}
}
public synchronized void receiveDisposition(short channel, Disposition disposition)
{
SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveDisposition(disposition);
}
}
public synchronized void receiveFlow(short channel, Flow flow)
{
SessionEndpoint endPoint = getSession(channel);
if (endPoint != null)
{
endPoint.receiveFlow(flow);
}
}
public synchronized void send(short channel, FrameBody body)
{
send(channel, body, null);
}
public synchronized int send(short channel, FrameBody body, ByteBuffer payload)
{
if (!_closedForOutput)
{
ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
ByteBuffer payloadDup = payload == null ? null : payload.duplicate();
int payloadSent = getMaxFrameSize() - (size + 9);
if (payloadSent < (payload == null ? 0 : payload.remaining()))
{
if (body instanceof Transfer)
{
((Transfer) body).setMore(Boolean.TRUE);
}
writer = _describedTypeRegistry.getValueWriter(body);
size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
payloadSent = getMaxFrameSize() - (size + 9);
try
{
payloadDup.limit(payloadDup.position() + payloadSent);
}
catch (NullPointerException npe)
{
throw npe;
}
}
else
{
payloadSent = payload == null ? 0 : payload.remaining();
}
_frameOutputHandler.send(AMQFrame.createAMQFrame(channel, body, payloadDup));
return payloadSent;
}
else
{
return -1;
}
}
public void invalidHeaderReceived()
{
// TODO
_closedForInput = true;
}
public synchronized boolean closedForInput()
{
return _closedForInput;
}
public synchronized void protocolHeaderReceived(final byte major, final byte minorVersion, final byte revision)
{
if (_requiresSASLServer && _state != ConnectionState.UNOPENED)
{
// TODO - bad stuff
}
_majorVersion = major;
_minorVersion = minorVersion;
_revision = revision;
}
public synchronized void handleError(final Error error)
{
if (!closedForOutput())
{
Close close = new Close();
close.setError(error);
send((short) 0, close);
this.setClosedForOutput(true);
}
}
private final Logger _logger = Logger.getLogger("FRM");
public synchronized void receive(final short channel, final Object frame)
{
if (_logger.isLoggable(Level.FINE))
{
_logger.fine("RECV[" + _remoteAddress + "|" + channel + "] : " + frame);
}
if (frame instanceof FrameBody)
{
((FrameBody) frame).invoke(channel, this);
}
else if (frame instanceof SaslFrameBody)
{
((SaslFrameBody) frame).invoke(this);
}
}
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
{
return _describedTypeRegistry;
}
public synchronized void setClosedForOutput(boolean b)
{
_closedForOutput = true;
notifyAll();
}
public synchronized boolean closedForOutput()
{
return _closedForOutput;
}
public Object getLock()
{
return this;
}
public synchronized long getIdleTimeout()
{
return _idleTimeout;
}
public synchronized void close()
{
switch (_state)
{
case AWAITING_OPEN:
case OPEN:
Close closeToSend = new Close();
sendClose(closeToSend);
_state = ConnectionState.CLOSE_SENT;
break;
case CLOSE_SENT:
default:
}
}
public void setConnectionEventListener(final ConnectionEventListener connectionEventListener)
{
_connectionEventListener = connectionEventListener;
}
public ConnectionEventListener getConnectionEventListener()
{
return _connectionEventListener;
}
public byte getMinorVersion()
{
return _minorVersion;
}
public byte getRevision()
{
return _revision;
}
public byte getMajorVersion()
{
return _majorVersion;
}
public void receiveSaslInit(final SaslInit saslInit)
{
Symbol mechanism = saslInit.getMechanism();
final Binary initialResponse = saslInit.getInitialResponse();
byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
try
{
_saslServer = _saslServerProvider.getSaslServer(mechanism.toString(), "localhost");
// Process response from the client
byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
if (_saslServer.isComplete())
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.OK);
_saslFrameOutput.send(new SASLFrame(outcome), null);
synchronized (getLock())
{
_saslComplete = true;
_authenticated = true;
_user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
getLock().notifyAll();
}
if (_onSaslCompleteTask != null)
{
_onSaslCompleteTask.run();
}
}
else
{
SaslChallenge challengeBody = new SaslChallenge();
challengeBody.setChallenge(new Binary(challenge));
_saslFrameOutput.send(new SASLFrame(challengeBody), null);
}
}
catch (SaslException e)
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.AUTH);
_saslFrameOutput.send(new SASLFrame(outcome), null);
synchronized (getLock())
{
_saslComplete = true;
_authenticated = false;
getLock().notifyAll();
}
if (_onSaslCompleteTask != null)
{
_onSaslCompleteTask.run();
}
}
}
public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
{
SaslInit init = new SaslInit();
init.setHostname(_remoteHostname);
Set<Symbol> mechanisms = new HashSet<Symbol>(Arrays.asList(saslMechanisms.getSaslServerMechanisms()));
if (mechanisms.contains(SASL_PLAIN) && _password != null)
{
init.setMechanism(SASL_PLAIN);
byte[] usernameBytes = _user.getName().getBytes(Charset.forName("UTF-8"));
byte[] passwordBytes = _password.getBytes(Charset.forName("UTF-8"));
byte[] initResponse = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, initResponse, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, initResponse, usernameBytes.length + 2, passwordBytes.length);
init.setInitialResponse(new Binary(initResponse));
}
else if (mechanisms.contains(SASL_ANONYMOUS))
{
init.setMechanism(SASL_ANONYMOUS);
}
else if (mechanisms.contains(SASL_EXTERNAL))
{
init.setMechanism(SASL_EXTERNAL);
}
_saslFrameOutput.send(new SASLFrame(init), null);
}
public void receiveSaslChallenge(final SaslChallenge saslChallenge)
{
//To change body of implemented methods use File | Settings | File Templates.
}
public void receiveSaslResponse(final SaslResponse saslResponse)
{
final Binary responseBinary = saslResponse.getResponse();
byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
try
{
// Process response from the client
byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
if (_saslServer.isComplete())
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.OK);
_saslFrameOutput.send(new SASLFrame(outcome), null);
synchronized (getLock())
{
_saslComplete = true;
_authenticated = true;
_user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
getLock().notifyAll();
}
if (_onSaslCompleteTask != null)
{
_onSaslCompleteTask.run();
}
}
else
{
SaslChallenge challengeBody = new SaslChallenge();
challengeBody.setChallenge(new Binary(challenge));
_saslFrameOutput.send(new SASLFrame(challengeBody), null);
}
}
catch (SaslException e)
{
SaslOutcome outcome = new SaslOutcome();
outcome.setCode(SaslCode.AUTH);
_saslFrameOutput.send(new SASLFrame(outcome), null);
synchronized (getLock())
{
_saslComplete = true;
_authenticated = false;
getLock().notifyAll();
}
if (_onSaslCompleteTask != null)
{
_onSaslCompleteTask.run();
}
}
}
public void receiveSaslOutcome(final SaslOutcome saslOutcome)
{
if (saslOutcome.getCode() == SaslCode.OK)
{
_saslFrameOutput.close();
synchronized (getLock())
{
_saslComplete = true;
_authenticated = true;
getLock().notifyAll();
}
if (_onSaslCompleteTask != null)
{
_onSaslCompleteTask.run();
}
}
else
{
synchronized (getLock())
{
_saslComplete = true;
_authenticated = false;
getLock().notifyAll();
}
setClosedForInput(true);
_saslFrameOutput.close();
}
}
public boolean requiresSASL()
{
return _requiresSASLClient || _requiresSASLServer;
}
public void setSaslFrameOutput(final FrameOutputHandler<SaslFrameBody> saslFrameOutput)
{
_saslFrameOutput = saslFrameOutput;
}
public void setOnSaslComplete(Runnable task)
{
_onSaslCompleteTask = task;
}
public boolean isAuthenticated()
{
return _authenticated;
}
public void initiateSASL(String[] mechanismNames)
{
SaslMechanisms mechanisms = new SaslMechanisms();
ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
for (String name : mechanismNames)
{
mechanismsList.add(Symbol.valueOf(name));
}
mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
_saslFrameOutput.send(new SASLFrame(mechanisms), null);
}
public boolean isSASLComplete()
{
return _saslComplete;
}
public SocketAddress getRemoteAddress()
{
return _remoteAddress;
}
public void setRemoteAddress(SocketAddress remoteAddress)
{
_remoteAddress = remoteAddress;
}
public String getRemoteHostname()
{
return _remoteHostname;
}
public void setRemoteHostname(final String remoteHostname)
{
_remoteHostname = remoteHostname;
}
public boolean isOpen()
{
return _state == ConnectionState.OPEN;
}
public boolean isClosed()
{
return _state == ConnectionState.CLOSED
|| _state == ConnectionState.CLOSE_RECEIVED
|| _state == ConnectionState.CLOSE_RECEIVED;
}
public Error getRemoteError()
{
return _remoteError;
}
public void setChannelMax(final short channelMax)
{
_channelMax = channelMax;
}
public long getSyncTimeout()
{
return _syncTimeout;
}
public void setSyncTimeout(final long syncTimeout)
{
_syncTimeout = syncTimeout;
}
public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException
{
waitUntil(predicate, _syncTimeout);
}
public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException
{
long endTime = System.currentTimeMillis() + timeout;
synchronized(getLock())
{
while(!predicate.isSatisfied())
{
getLock().wait(timeout);
if(!predicate.isSatisfied())
{
timeout = endTime - System.currentTimeMillis();
if(timeout <= 0l)
{
throw new TimeoutException();
}
}
}
}
}
}