blob: db84b83adb49fda3f6c8568214131f7c43b94f37 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.mina.MinaHandler;
import static org.apache.qpid.transport.util.Functions.str;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* ToyBroker
*
* @author Rafael H. Schloming
*/
class ToyBroker extends SessionDelegate
{
private ToyExchange exchange;
private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
{
this.exchange = exchange;
}
public void messageAcquire(Session context, MessageAcquire struct)
{
System.out.println("\n==================> messageAcquire " );
context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
{
exchange.createQueue(qd.getQueue());
System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
}
@Override public void exchangeBind(Session ssn, ExchangeBind qb)
{
exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
{
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult((int) qq.getId(), result);
}
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
Consumer c = new Consumer();
c._queueName = ms.getQueue();
consumers.put(ms.getDestination(),c);
System.out.println("\n==================> message subscribe : " + ms.getDestination() + " queue: " + ms.getQueue() + "\n");
}
@Override public void messageFlow(Session ssn,MessageFlow struct)
{
Consumer c = consumers.get(struct.getDestination());
c._credit = struct.getValue();
System.out.println("\n==================> message flow : " + struct.getDestination() + " credit: " + struct.getValue() + "\n");
}
@Override public void messageFlush(Session ssn,MessageFlush struct)
{
System.out.println("\n==================> message flush for consumer : " + struct.getDestination() + "\n");
checkAndSendMessagesToConsumer(ssn,struct.getDestination());
}
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
String dest = xfr.getDestination();
System.out.println("received transfer " + dest);
Header header = xfr.getHeader();
DeliveryProperties props = header.get(DeliveryProperties.class);
if (props != null)
{
System.out.println("received headers routing_key " + props.getRoutingKey());
}
MessageProperties mp = header.get(MessageProperties.class);
System.out.println("MP: " + mp);
if (mp != null)
{
System.out.println(mp.getApplicationHeaders());
}
if (exchange.route(dest,props.getRoutingKey(),xfr))
{
System.out.println("queued " + xfr);
dispatchMessages(ssn);
}
else
{
if (props == null || !props.getDiscardUnroutable())
{
RangeSet ranges = new RangeSet();
ranges.add(xfr.getId());
ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
"no such destination");
}
}
ssn.processed(xfr);
}
private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
m.getHeader(), m.getBody());
}
private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
{
checkAndSendMessagesToConsumer(ssn,dest);
}
}
private void checkAndSendMessagesToConsumer(Session ssn,String dest)
{
Consumer c = consumers.get(dest);
LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
MessageTransfer m = queue.poll();
while (m != null && c._credit>0)
{
transferMessageToPeer(ssn,dest,m);
c._credit--;
m = queue.poll();
}
}
// ugly, but who cares :)
// assumes unit is always no of messages, not bytes
// assumes it's credit mode and not window
private class Consumer
{
long _credit;
String _queueName;
}
public static final void main(String[] args) throws IOException
{
final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new ToyBroker(exchange);
}
};
MinaHandler.accept("0.0.0.0", 5672, delegate);
}
}