blob: 644f72ab45b065a469ef5e4020452f90f75a61bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.proton.hawtdispatch.api;
import org.apache.qpid.proton.hawtdispatch.impl.Defer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import java.util.LinkedList;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class AmqpReceiver extends AmqpLink {
final AmqpSession parent;
final Receiver receiver;
public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) {
this.parent = parent;
this.receiver = receiver2;
attach();
}
@Override
protected Receiver getEndpoint() {
return receiver;
}
@Override
protected AmqpSession getParent() {
return parent;
}
ByteArrayOutputStream current = new ByteArrayOutputStream();
@Override
protected void processDelivery(Delivery delivery) {
if( !delivery.isReadable() ) {
System.out.println("it was not readable!");
return;
}
if( current==null ) {
current = new ByteArrayOutputStream();
}
int count;
byte data[] = new byte[1024*4];
while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
current.write(data, 0, count);
}
// Expecting more deliveries..
if( count == 0 ) {
return;
}
receiver.advance();
Buffer buffer = current.toBuffer();
current = null;
onMessage(delivery, buffer);
}
LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
protected void onMessage(Delivery delivery, Buffer buffer) {
MessageDelivery md = new MessageDelivery(buffer) {
@Override
AmqpLink link() {
return AmqpReceiver.this;
}
@Override
public void settle() {
if( !delivery.isSettled() ) {
delivery.disposition(new Accepted());
delivery.settle();
}
drain();
}
};
md.delivery = delivery;
delivery.setContext(md);
inbound.add(md);
drainInbound();
}
public void drain() {
defer(deferedDrain);
}
Defer deferedDrain = new Defer(){
public void run() {
drainInbound();
}
};
int resumed = 0;
public void resume() {
resumed++;
}
public void suspend() {
resumed--;
}
AmqpDeliveryListener deliveryListener;
private void drainInbound() {
while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
deliveryListener.onMessageDelivery(inbound.removeFirst());
receiver.flow(1);
}
}
public AmqpDeliveryListener getDeliveryListener() {
return deliveryListener;
}
public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
this.deliveryListener = deliveryListener;
drainInbound();
}
}