blob: bb5bf86ff622147b3b5636a3b07404459af05bd4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.examples;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import java.util.HashMap;
import java.util.Map;
/**
* Pool
*
*/
public class Pool
{
final private Collector collector;
final private Map<String,Connection> connections;
final private LinkConstructor<Sender> outgoingConstructor = new LinkConstructor<Sender> () {
public Sender create(Session ssn, String remote, String local) {
return newOutgoing(ssn, remote, local);
}
};
final private LinkConstructor<Receiver> incomingConstructor = new LinkConstructor<Receiver> () {
public Receiver create(Session ssn, String remote, String local) {
return newIncoming(ssn, remote, local);
}
};
final private LinkResolver<Sender> outgoingResolver;
final private LinkResolver<Receiver> incomingResolver;
public Pool(Collector collector, final Router router) {
this.collector = collector;
connections = new HashMap<String,Connection>();
if (router != null) {
outgoingResolver = new LinkResolver<Sender>() {
public Sender resolve(String address) {
return router.getOutgoing(address).choose();
}
};
incomingResolver = new LinkResolver<Receiver>() {
public Receiver resolve(String address) {
return router.getIncoming(address).choose();
}
};
} else {
outgoingResolver = new LinkResolver<Sender>() {
public Sender resolve(String address) { return null; }
};
incomingResolver = new LinkResolver<Receiver>() {
public Receiver resolve(String address) { return null; }
};
}
}
public Pool(Collector collector) {
this(collector, null);
}
private <T extends Link> T resolve(String remote, String local,
LinkResolver<T> resolver,
LinkConstructor<T> constructor) {
String host = remote.substring(2).split("/", 2)[0];
T link = resolver.resolve(remote);
if (link == null) {
Connection conn = connections.get(host);
if (conn == null) {
conn = Connection.Factory.create();
conn.collect(collector);
conn.setHostname(host);
conn.open();
connections.put(host, conn);
}
Session ssn = conn.session();
ssn.open();
link = constructor.create(ssn, remote, local);
link.open();
}
return link;
}
public Sender outgoing(String target, String source) {
return resolve(target, source, outgoingResolver, outgoingConstructor);
}
public Receiver incoming(String source, String target) {
return resolve(source, target, incomingResolver, incomingConstructor);
}
public Sender newOutgoing(Session ssn, String remote, String local) {
Sender snd = ssn.sender(String.format("%s-%s", local, remote));
Source src = new Source();
src.setAddress(local);
snd.setSource(src);
Target tgt = new Target();
tgt.setAddress(remote);
snd.setTarget(tgt);
return snd;
}
public Receiver newIncoming(Session ssn, String remote, String local) {
Receiver rcv = ssn.receiver(String.format("%s-%s", remote, local));
Source src = new Source();
src.setAddress(remote);
rcv.setSource(src);
Target tgt = new Target();
tgt.setAddress(remote);
rcv.setTarget(tgt);
return rcv;
}
public static interface LinkConstructor<T extends Link> {
T create(Session session, String remote, String local);
}
public static interface LinkResolver<T extends Link> {
T resolve(String remote);
}
}