blob: b5d86ccce735aca8dd19d291b465eaad011e507e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v0_8.federation;
import java.security.AccessControlContext;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetOkBody;
import org.apache.qpid.framing.ConfirmSelectBody;
import org.apache.qpid.framing.ConfirmSelectOkBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageContentSourceBody;
import org.apache.qpid.server.transfer.TransferQueueConsumer;
import org.apache.qpid.server.transfer.TransferQueueEntry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class TransferSession_0_8 implements OutboundChannel
{
private static final Logger LOGGER = LoggerFactory.getLogger(TransferSession_0_8.class);
private static final Runnable NO_OP = new Runnable()
{
@Override
public void run()
{
}
};
private final OutboundConnection_0_8 _connection;
private final int _channelId;
private final MethodRegistry _methodRegistry;
private final LocalTransaction _txn;
private Collection<String> _remoteHostGlobalDomains;
private TransferTarget_0_8 _target;
private TransferQueueConsumer _consumer;
private final SortedMap<Long, TransferQueueEntry> _unconfirmed = new TreeMap<>();
private volatile boolean _flowControlled;
private volatile long _lastSent = -0L;
private int _maxUnconfirmed = 200;
enum Operation {
EXCHANGE_DECLARE,
EXCHANGE_DELETE,
EXCHANGE_BOUND,
QUEUE_BIND,
QUEUE_UNBIND,
QUEUE_DECLARE,
QUEUE_DELETE,
QUEUE_PURGE,
BASIC_RECOVER_SYNC,
BAISC_QOS,
BASIC_CONSUME,
BASIC_CANCEL,
TX_SELECT,
TX_COMMIT,
TX_ROLLBACK,
CHANNEL_FLOW,
CONFIRM_SELECT,
ACK,
BASIC_GET
}
private static final class TimedSettableFuture<V> extends AbstractFuture<V>
{
private final long _timeoutTime;
private TimedSettableFuture(final long timeoutTime)
{
_timeoutTime = timeoutTime;
}
public boolean set(V value)
{
return super.set(value);
}
@Override
public boolean setException(Throwable throwable)
{
return super.setException(throwable);
}
public long getTimeoutTime()
{
return _timeoutTime;
}
public boolean checkTimeout(long time)
{
if(time > _timeoutTime)
{
setException(new TimeoutException("Timed out waiting for response"));
return true;
}
else
{
return false;
}
}
}
private final Map<Operation, Queue<TimedSettableFuture<AMQMethodBody>>> _synchronousOperationListeners = new EnumMap<>(Operation.class);
{
for(Operation op : Operation.values())
{
_synchronousOperationListeners.put(op, new LinkedList<TimedSettableFuture<AMQMethodBody>>());
}
}
enum State { AWAITING_OPEN, OPEN, AWAITING_CLOSE_OK }
private State _state = State.AWAITING_OPEN;
private interface MessageHandler
{
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
void receiveMessageContent(QpidByteBuffer data);
}
private final MessageHandler _unexpectedMessageHandler = new MessageHandler()
{
@Override
public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
{
throw new ConnectionScopedRuntimeException("Unexpected frame");
}
@Override
public void receiveMessageContent(final QpidByteBuffer data)
{
throw new ConnectionScopedRuntimeException("Unexpected frame");
}
};
private volatile MessageHandler _messageHandler = _unexpectedMessageHandler;
TransferSession_0_8(final int channelId, final OutboundConnection_0_8 outboundConnection)
{
_connection = outboundConnection;
_channelId = channelId;
_methodRegistry = outboundConnection.getMethodRegistry();
writeMethod(_methodRegistry.createChannelOpenBody(AMQShortString.EMPTY_STRING));
_txn = new LocalTransaction(outboundConnection.getVirtualHost().getMessageStore());
}
private synchronized void changeState(final State currentState, final State newState)
{
if(_state != currentState)
{
throw new ConnectionScopedRuntimeException("Incorrect state");
}
_state = newState;
}
private synchronized void assertState(final State currentState)
{
if (_state != currentState)
{
throw new ConnectionScopedRuntimeException("Incorrect state");
}
}
@Override
public void receiveChannelOpenOk()
{
changeState(State.AWAITING_OPEN, State.OPEN);
writeMethod(new ConfirmSelectBody(true));
writeMethod(_methodRegistry.createBasicGetBody(0, AMQShortString.valueOf("$virtualhostProperties"), true));
final FutureCallback<AMQMethodBody> callback = new FutureCallback<AMQMethodBody>()
{
@Override
public void onSuccess(final AMQMethodBody result)
{
if (result instanceof BasicGetOkBody)
{
_messageHandler = new VirtualHostPropertiesHandler();
}
else
{
throw new ConnectionScopedRuntimeException("Unable to determine virtual host properties from remote host");
}
}
@Override
public void onFailure(final Throwable t)
{
if(t instanceof RuntimeException)
{
throw ((RuntimeException)t);
}
else
{
throw new ConnectionScopedRuntimeException(t);
}
}
};
addOperationResponse(Operation.BASIC_GET, callback);
}
private void addOperationResponse(Operation operation, FutureCallback<AMQMethodBody> callback)
{
final TimedSettableFuture<AMQMethodBody> future =
new TimedSettableFuture<>(System.currentTimeMillis() + 30000L);
Futures.addCallback(future, callback);
_synchronousOperationListeners.get(operation).add(future);
}
private void performOperationResponse(Operation operation, AMQMethodBody body)
{
final TimedSettableFuture<AMQMethodBody> future = _synchronousOperationListeners.get(operation).poll();
if(future == null)
{
throw new ConnectionScopedRuntimeException("Unexpected frame ");
}
else
{
future.set(body);
}
}
private void setRemoteHostGlobalDomains(final Collection<String> remoteHostGlobalDomains)
{
LOGGER.debug("Setting remote host global domains: {}", remoteHostGlobalDomains);
_remoteHostGlobalDomains = remoteHostGlobalDomains;
_target = new TransferTarget_0_8(this, remoteHostGlobalDomains);
_consumer = _connection.getVirtualHost().getTransferQueue().addConsumer(_target, "consumer");
}
@Override
public void receiveChannelAlert(final int replyCode, final AMQShortString replyText, final FieldTable details)
{
}
@Override
public void receiveAccessRequestOk(final int ticket)
{
}
@Override
public void receiveExchangeDeclareOk()
{
performOperationResponse(Operation.EXCHANGE_DECLARE, _methodRegistry.createExchangeDeclareOkBody());
}
@Override
public void receiveExchangeDeleteOk()
{
performOperationResponse(Operation.EXCHANGE_DELETE, _methodRegistry.createExchangeDeleteOkBody());
}
@Override
public void receiveExchangeBoundOk(final int replyCode, final AMQShortString replyText)
{
performOperationResponse(Operation.EXCHANGE_BOUND, _methodRegistry.createExchangeBoundOkBody(replyCode, replyText));
}
@Override
public void receiveQueueBindOk()
{
performOperationResponse(Operation.QUEUE_BIND, _methodRegistry.createQueueBindOkBody());
}
@Override
public void receiveQueueUnbindOk()
{
performOperationResponse(Operation.QUEUE_UNBIND, _methodRegistry.createQueueUnbindOkBody());
}
@Override
public void receiveQueueDeclareOk(final AMQShortString queue, final long messageCount, final long consumerCount)
{
performOperationResponse(Operation.QUEUE_DECLARE, _methodRegistry.createQueueDeclareOkBody(queue, messageCount, consumerCount));
}
@Override
public void receiveQueuePurgeOk(final long messageCount)
{
performOperationResponse(Operation.QUEUE_PURGE, _methodRegistry.createQueuePurgeOkBody(messageCount));
}
@Override
public void receiveQueueDeleteOk(final long messageCount)
{
performOperationResponse(Operation.QUEUE_DELETE, _methodRegistry.createQueueDeleteOkBody(messageCount));
}
@Override
public void receiveBasicRecoverSyncOk()
{
performOperationResponse(Operation.BASIC_RECOVER_SYNC, _methodRegistry.createBasicRecoverSyncOkBody());
}
@Override
public void receiveBasicQosOk()
{
performOperationResponse(Operation.BAISC_QOS, _methodRegistry.createBasicQosOkBody());
}
@Override
public void receiveBasicConsumeOk(final AMQShortString consumerTag)
{
performOperationResponse(Operation.BASIC_CONSUME, _methodRegistry.createBasicConsumeOkBody(consumerTag));
}
@Override
public void receiveBasicCancelOk(final AMQShortString consumerTag)
{
performOperationResponse(Operation.BASIC_CANCEL, _methodRegistry.createBasicCancelOkBody(consumerTag));
}
@Override
public void receiveBasicReturn(final int replyCode,
final AMQShortString replyText,
final AMQShortString exchange,
final AMQShortString routingKey)
{
}
@Override
public void receiveBasicDeliver(final AMQShortString consumerTag,
final long deliveryTag,
final boolean redelivered,
final AMQShortString exchange,
final AMQShortString routingKey)
{
}
@Override
public void receiveBasicGetOk(final long deliveryTag,
final boolean redelivered,
final AMQShortString exchange,
final AMQShortString routingKey,
final long messageCount)
{
performOperationResponse(Operation.BASIC_GET, _methodRegistry.createBasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount));
}
@Override
public void receiveBasicGetEmpty()
{
performOperationResponse(Operation.BASIC_GET, _methodRegistry.createBasicGetEmptyBody(AMQShortString.EMPTY_STRING));
}
@Override
public void receiveTxSelectOk()
{
performOperationResponse(Operation.TX_SELECT, _methodRegistry.createTxSelectOkBody());
}
@Override
public void receiveTxCommitOk()
{
performOperationResponse(Operation.TX_COMMIT, _methodRegistry.createTxCommitOkBody());
}
@Override
public void receiveTxRollbackOk()
{
performOperationResponse(Operation.TX_ROLLBACK, _methodRegistry.createTxRollbackOkBody());
}
@Override
public void receiveConfirmSelectOk()
{
performOperationResponse(Operation.CONFIRM_SELECT, ConfirmSelectOkBody.INSTANCE);
}
@Override
public void receiveChannelFlow(final boolean active)
{
_flowControlled = !active;
writeMethod(_methodRegistry.createChannelFlowOkBody(active));
}
@Override
public void receiveChannelFlowOk(final boolean active)
{
performOperationResponse(Operation.CHANNEL_FLOW, _methodRegistry.createChannelFlowOkBody(active));
}
@Override
public void receiveChannelClose(final int replyCode,
final AMQShortString replyText,
final int classId,
final int methodId)
{
}
@Override
public void receiveChannelCloseOk()
{
}
@Override
public void receiveMessageContent(final QpidByteBuffer data)
{
_messageHandler.receiveMessageContent(data);
}
@Override
public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
{
_messageHandler.receiveMessageHeader(properties, bodySize);
}
@Override
public boolean ignoreAllButCloseOk()
{
return false;
}
@Override
public void receiveBasicNack(final long deliveryTag, final boolean multiple, final boolean requeue)
{
if(multiple)
{
Iterator<Map.Entry<Long, TransferQueueEntry>> iter = _unconfirmed.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry<Long, TransferQueueEntry> pair = iter.next();
if(pair.getKey() > deliveryTag)
{
break;
}
else
{
final TransferQueueEntry entry = pair.getValue();
if(requeue)
{
entry.release();
}
else
{
_txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
}
}
}
else
{
final TransferQueueEntry entry = _unconfirmed.remove(deliveryTag);
if(requeue)
{
entry.release();
}
else
{
_txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
}
}
@Override
public void receiveBasicAck(final long deliveryTag, final boolean multiple)
{
if(multiple)
{
Iterator<Map.Entry<Long, TransferQueueEntry>> iter = _unconfirmed.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry<Long, TransferQueueEntry> pair = iter.next();
if(pair.getKey() > deliveryTag)
{
break;
}
else
{
final TransferQueueEntry entry = pair.getValue();
_txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
}
}
else
{
final TransferQueueEntry entry = _unconfirmed.remove(deliveryTag);
_txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action()
{
@Override
public void postCommit()
{
entry.delete();
}
@Override
public void onRollback()
{
entry.release();
}
});
}
}
boolean isSuspended()
{
return _flowControlled || _unconfirmed.size() > _maxUnconfirmed;
}
@Override
public AccessControlContext getAccessControllerContext()
{
return null;
}
@Override
public boolean processPending()
{
if(_consumer != null)
{
return _consumer.processPending();
}
return false;
}
void notifyWork()
{
_connection.notifyWork();
}
@Override
public void receivedComplete()
{
_txn.commitAsync(NO_OP);
}
void writeMethod(AMQMethodBody body)
{
writeFrame(body.generateFrame(_channelId));
}
void writeFrame(AMQFrame frame)
{
_connection.writeFrame(frame);
}
private class VirtualHostPropertiesHandler implements MessageHandler
{
private long _remaining;
private BasicContentHeaderProperties _properties;
@Override
public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
{
_remaining = bodySize;
_properties = properties;
receiveVirtualHostProperties(_properties);
}
@Override
public void receiveMessageContent(final QpidByteBuffer data)
{
if((_remaining -= data.remaining()) == 0l)
{
_messageHandler = _unexpectedMessageHandler;
}
}
}
private void receiveVirtualHostProperties(final BasicContentHeaderProperties properties)
{
Collection<String> remoteHostGlobalDomains =
properties.getHeaders().getFieldArray("virtualhost.globalDomains");
setRemoteHostGlobalDomains(remoteHostGlobalDomains);
}
void transfer(final TransferQueueEntry entry)
{
ServerMessage message = entry.getMessage();
_unconfirmed.put(++_lastSent, entry);
writeMethod(_methodRegistry.createBasicPublishBody(0, "", message.getInitialRoutingAddress(), false, false));
if(!(message instanceof AMQMessage))
{
final MessageConverter converter =
MessageConverterRegistry.getConverter(message.getClass(), AMQMessage.class);
message = converter.convert(message, _connection.getVirtualHost());
}
AMQMessage amqMessage = (AMQMessage) message;
ContentHeaderBody contentHeaderBody = amqMessage.getContentHeaderBody();
writeHeader(contentHeaderBody);
int bodySize = (int) contentHeaderBody.getBodySize();
if (bodySize > 0)
{
int maxBodySize = (int) _connection.getMaxFrameSize() - AMQFrame.getFrameOverhead();
int writtenSize = 0;
while (writtenSize < bodySize)
{
int capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
AMQBody body = new MessageContentSourceBody(message, writtenSize, capacity);
writtenSize += capacity;
writeFrame(new AMQFrame(_channelId, body));
}
}
}
private void writeHeader(final ContentHeaderBody headerBody)
{
_connection.writeFrame(new AMQFrame(_channelId, headerBody));
}
}