blob: 9a672d5c027e3acc8ca7e054b24d75ab2e5aabe3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.proton.hawtdispatch.api;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.apache.qpid.proton.hawtdispatch.impl.Watch;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.fusesource.hawtbuf.Buffer;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AmqpSender extends AmqpLink {
private byte[] EMPTY_BYTE_ARRAY = new byte[]{};
long nextTagId = 0;
HashSet<byte[]> tagCache = new HashSet<byte[]>();
final AmqpSession parent;
private final QoS qos;
final Sender sender;
public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) {
this.parent = parent;
this.sender = sender2;
this.qos = qos;
attach();
getConnection().senders.add(this);
}
@Override
public void close() {
super.close();
getConnection().senders.remove(this);
}
@Override
protected Sender getEndpoint() {
return sender;
}
@Override
protected AmqpSession getParent() {
return parent;
}
final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
long outboundBufferSize;
public MessageDelivery send(Message message) {
assertExecuting();
MessageDelivery rc = new MessageDelivery(message) {
@Override
AmqpLink link() {
return AmqpSender.this;
}
@Override
public void redeliver(boolean incrementDeliveryCounter) {
super.redeliver(incrementDeliveryCounter);
outbound.add(this);
outboundBufferSize += initialSize;
defer(deferedPumpDeliveries);
}
};
outbound.add(rc);
outboundBufferSize += rc.initialSize;
pumpDeliveries();
pumpOut();
return rc;
}
Buffer currentBuffer;
Delivery currentDelivery;
Defer deferedPumpDeliveries = new Defer() {
public void run() {
pumpDeliveries();
}
};
public long getOverflowBufferSize() {
return outboundBufferSize;
}
protected void pumpDeliveries() {
assertExecuting();
try {
while(true) {
while( currentBuffer !=null ) {
if( sender.getCredit() > 0 ) {
int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
currentBuffer.moveHead(sent);
if( currentBuffer.length == 0 ) {
Delivery current = currentDelivery;
MessageDelivery md = (MessageDelivery) current.getContext();
currentBuffer = null;
currentDelivery = null;
if( qos == QoS.AT_MOST_ONCE ) {
current.settle();
} else {
sender.advance();
}
md.fireWatches();
}
} else {
return;
}
}
if( outbound.isEmpty() ) {
return;
}
final MessageDelivery md = outbound.removeFirst();
outboundBufferSize -= md.initialSize;
currentBuffer = md.encoded();
if( qos == QoS.AT_MOST_ONCE ) {
currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
}
md.delivery = currentDelivery;
currentDelivery.setContext(md);
}
} finally {
fireWatches();
}
}
@Override
protected void processDelivery(Delivery delivery) {
final MessageDelivery md = (MessageDelivery) delivery.getContext();
if( delivery.remotelySettled() ) {
if( delivery.getTag().length > 0 ) {
checkinTag(delivery.getTag());
}
final DeliveryState state = delivery.getRemoteState();
if( state==null || state instanceof Accepted) {
if( !delivery.remotelySettled() ) {
delivery.disposition(new Accepted());
}
} else if( state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.delivery = null;
md.incrementDeliveryCount();
outbound.addLast(md);
} else if( state instanceof Released) {
// re-deliver && don't increment the counter.
md.delivery = null;
outbound.addLast(md);
} else if( state instanceof Modified) {
Modified modified = (Modified) state;
if ( modified.getDeliveryFailed() ) {
// increment delivery counter..
md.incrementDeliveryCount();
}
}
delivery.settle();
}
md.fireWatches();
}
byte[] nextTag() {
byte[] rc;
if (tagCache != null && !tagCache.isEmpty()) {
final Iterator<byte[]> iterator = tagCache.iterator();
rc = iterator.next();
iterator.remove();
} else {
try {
rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
return rc;
}
void checkinTag(byte[] data) {
if( tagCache.size() < 1024 ) {
tagCache.add(data);
}
}
public void onOverflowBufferDrained(final Callback<Void> cb) {
addWatch(new Watch() {
@Override
public boolean execute() {
if (outboundBufferSize==0) {
cb.onSuccess(null);
return true;
}
return false;
}
});
}
}