| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.proton.messenger.impl; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.EnumSet; |
| import java.util.Iterator; |
| import java.util.concurrent.TimeoutException; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.apache.qpid.proton.ProtonFactoryLoader; |
| import org.apache.qpid.proton.driver.Connector; |
| import org.apache.qpid.proton.driver.Driver; |
| import org.apache.qpid.proton.driver.DriverFactory; |
| import org.apache.qpid.proton.driver.Listener; |
| import org.apache.qpid.proton.engine.Connection; |
| import org.apache.qpid.proton.engine.Delivery; |
| import org.apache.qpid.proton.engine.EndpointState; |
| import org.apache.qpid.proton.engine.EngineFactory; |
| import org.apache.qpid.proton.engine.Link; |
| import org.apache.qpid.proton.engine.Receiver; |
| import org.apache.qpid.proton.engine.Sasl; |
| import org.apache.qpid.proton.engine.Sender; |
| import org.apache.qpid.proton.engine.Session; |
| import org.apache.qpid.proton.message.Message; |
| import org.apache.qpid.proton.message.MessageFactory; |
| import org.apache.qpid.proton.messenger.Messenger; |
| import org.apache.qpid.proton.messenger.MessengerException; |
| import org.apache.qpid.proton.messenger.MessengerFactory; |
| import org.apache.qpid.proton.messenger.Status; |
| import org.apache.qpid.proton.messenger.Tracker; |
| import org.apache.qpid.proton.amqp.messaging.Accepted; |
| import org.apache.qpid.proton.amqp.messaging.Source; |
| import org.apache.qpid.proton.amqp.messaging.Target; |
| |
| public class MessengerImpl implements Messenger |
| { |
| private static ProtonFactoryLoader protonFactoryLoader = new ProtonFactoryLoader(); |
| |
| private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED); |
| private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE); |
| private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED); |
| private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class); |
| private static final Accepted ACCEPTED = new Accepted(); |
| |
| private final Logger _logger = Logger.getLogger("proton.messenger"); |
| private final String _name; |
| private final EngineFactory _engineFactory; |
| private final DriverFactory _driverFactory; |
| private final MessageFactory _messageFactory; |
| private long _timeout = -1; |
| private long _nextTag = 1; |
| private byte[] _buffer = new byte[5*1024]; |
| private Driver _driver; |
| private int _credit; |
| private int _distributed; |
| private TrackerQueue _incoming = new TrackerQueue(); |
| private TrackerQueue _outgoing = new TrackerQueue(); |
| |
| |
| /** |
| * @deprecated This constructor's visibility will be reduced to the default scope in a future release. |
| * Client code outside this module should use a {@link MessengerFactory} instead |
| */ |
| @Deprecated public MessengerImpl() |
| { |
| this(java.util.UUID.randomUUID().toString()); |
| } |
| |
| /** |
| * @deprecated This constructor's visibility will be reduced to the default scope in a future release. |
| * Client code outside this module should use a {@link MessengerFactory} instead |
| */ |
| @Deprecated public MessengerImpl(String name) |
| { |
| this(name, defaultEngineFactory(), defaultDriverFactory(), defaultMessageFactory()); |
| } |
| |
| /** |
| * @deprecated This constructor's visibility will be reduced to the default scope in a future release. |
| * Client code outside this module should use a {@link MessengerFactory} instead |
| */ |
| @Deprecated public MessengerImpl(String name, EngineFactory engineFactory, DriverFactory driverFactory, MessageFactory messageFactory) |
| { |
| _name = name; |
| _engineFactory = engineFactory; |
| _driverFactory = driverFactory; |
| _messageFactory = messageFactory; |
| } |
| |
| public void setTimeout(long timeInMillis) |
| { |
| _timeout = timeInMillis; |
| } |
| |
| public long getTimeout() |
| { |
| return _timeout; |
| } |
| |
| public void start() throws IOException |
| { |
| _driver = _driverFactory.createDriver(); |
| } |
| |
| public void stop() |
| { |
| //close all connections |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| connection.close(); |
| try |
| { |
| c.process(); |
| } |
| catch (IOException e) |
| { |
| _logger.log(Level.WARNING, "Error while sending close", e); |
| } |
| } |
| //stop listeners |
| for (Listener l : _driver.listeners()) |
| { |
| try |
| { |
| l.close(); |
| } |
| catch (IOException e) |
| { |
| _logger.log(Level.WARNING, "Error while closing listener", e); |
| } |
| } |
| try |
| { |
| waitUntil(_allClosed); |
| } |
| catch(TimeoutException e) |
| { |
| _logger.log(Level.WARNING, "Timed out while waiting for close", e); |
| } |
| _driver.destroy(); |
| } |
| |
| public void put(Message m) throws MessengerException |
| { |
| try |
| { |
| URI address = new URI(m.getAddress()); |
| if (address.getHost() == null) |
| { |
| throw new MessengerException("unable to send to address: " + m.getAddress()); |
| } |
| int port = address.getPort() < 0 ? defaultPort(address.getScheme()) : address.getPort(); |
| Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getPath())); |
| |
| adjustReplyTo(m); |
| |
| byte[] tag = String.valueOf(_nextTag++).getBytes(); |
| Delivery delivery = sender.delivery(tag); |
| int encoded; |
| while (true) |
| { |
| try |
| { |
| encoded = m.encode(_buffer, 0, _buffer.length); |
| break; |
| } catch (java.nio.BufferOverflowException e) { |
| _buffer = new byte[_buffer.length*2]; |
| } |
| } |
| sender.send(_buffer, 0, encoded); |
| _outgoing.add(delivery); |
| sender.advance(); |
| } |
| catch (URISyntaxException e) |
| { |
| throw new MessengerException("Invalid address: " + m.getAddress(), e); |
| } |
| } |
| |
| public void send() throws java.util.concurrent.TimeoutException |
| { |
| waitUntil(_sentSettled); |
| } |
| |
| public void recv(int n) throws java.util.concurrent.TimeoutException |
| { |
| _credit += n; |
| distributeCredit(); |
| |
| waitUntil(_messageAvailable); |
| } |
| |
| public Message get() |
| { |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| _logger.log(Level.FINE, "Attempting to get message from " + connection); |
| Delivery delivery = connection.getWorkHead(); |
| while (delivery != null) |
| { |
| if (delivery.isReadable() && !delivery.isPartial()) |
| { |
| _logger.log(Level.FINE, "Readable delivery found: " + delivery); |
| int size = read((Receiver) delivery.getLink()); |
| Message message = _messageFactory.createMessage(); |
| message.decode(_buffer, 0, size); |
| _incoming.add(delivery); |
| _distributed--; |
| delivery.getLink().advance(); |
| return message; |
| } |
| else |
| { |
| _logger.log(Level.FINE, "Delivery not readable: " + delivery); |
| delivery = delivery.getWorkNext(); |
| } |
| } |
| } |
| return null; |
| } |
| |
| public void subscribe(String source) throws MessengerException |
| { |
| //the following is not safe or accurate, but it appears '~' is |
| //invalid as the start of the hostname and URI can't handle |
| //it, so this is a quick hack to avoid rewriting the parsing |
| //logic for URLs right now... |
| boolean listen = source.contains("~"); |
| try |
| { |
| URI address = new URI(listen ? source.replace("~", "") : source); |
| if (address.getHost() == null) throw new MessengerException("Invalid source address (hostname cannot be null): " + source); |
| int port = address.getPort() < 0 ? defaultPort(address.getScheme()) : address.getPort(); |
| if (listen) |
| { |
| _driver.createListener(address.getHost(), port, null); |
| } |
| else |
| { |
| getLink(address.getHost(), port, new ReceiverFinder(address.getPath())); |
| } |
| } |
| catch (URISyntaxException e) |
| { |
| throw new MessengerException("Invalid source: " + source, e); |
| } |
| |
| } |
| |
| public int outgoing() |
| { |
| return queued(true); |
| } |
| |
| public int incoming() |
| { |
| return queued(false); |
| } |
| |
| |
| public int getIncomingWindow() |
| { |
| return _incoming.getWindow(); |
| } |
| public void setIncomingWindow(int window) |
| { |
| _incoming.setWindow(window); |
| } |
| |
| public int getOutgoingWindow() |
| { |
| return _outgoing.getWindow(); |
| } |
| public void setOutgoingWindow(int window) |
| { |
| _outgoing.setWindow(window); |
| } |
| |
| public Tracker incomingTracker() |
| { |
| return new TrackerImpl(false, _incoming.getHighWaterMark() - 1); |
| } |
| public Tracker outgoingTracker() |
| { |
| return new TrackerImpl(true, _outgoing.getHighWaterMark() - 1); |
| } |
| |
| private TrackerQueue getTrackerQueue(Tracker tracker) |
| { |
| return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming; |
| } |
| public void reject(Tracker tracker, int flags) |
| { |
| getTrackerQueue(tracker).reject(tracker, flags); |
| } |
| public void accept(Tracker tracker, int flags) |
| { |
| getTrackerQueue(tracker).accept(tracker, flags); |
| } |
| public void settle(Tracker tracker, int flags) |
| { |
| getTrackerQueue(tracker).settle(tracker, flags); |
| } |
| |
| public Status getStatus(Tracker tracker) |
| { |
| return getTrackerQueue(tracker).getStatus(tracker); |
| } |
| |
| private int queued(boolean outgoing) |
| { |
| int count = 0; |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| for (Link link : new Links(connection, ACTIVE, ANY)) |
| { |
| if (outgoing) |
| { |
| if (link instanceof Sender) count += link.getQueued(); |
| } |
| else |
| { |
| if (link instanceof Receiver) count += link.getQueued(); |
| } |
| } |
| } |
| return count; |
| } |
| |
| private int read(Receiver receiver) |
| { |
| //TODO: add pending count to Delivery? |
| int total = 0; |
| int start = 0; |
| while (true) |
| { |
| int read = receiver.recv(_buffer, start, _buffer.length - start); |
| total += read; |
| if (read == (_buffer.length - start)) |
| { |
| //may need to expand the buffer (is there a better test?) |
| byte[] old = _buffer; |
| _buffer = new byte[_buffer.length*2]; |
| System.arraycopy(old, 0, _buffer, 0, old.length); |
| start += read; |
| } |
| else |
| { |
| break; |
| } |
| } |
| return total; |
| } |
| |
| private void process() |
| { |
| processAllConnectors(); |
| processActive(); |
| } |
| |
| private void processAllConnectors() |
| { |
| for (Connector c : _driver.connectors()) |
| { |
| try |
| { |
| c.process(); |
| } |
| catch (IOException e) |
| { |
| _logger.log(Level.SEVERE, "Error processing connection", e); |
| } |
| } |
| } |
| |
| private void processActive() |
| { |
| //process active listeners |
| for (Listener l = _driver.listener(); l != null; l = _driver.listener()) |
| { |
| Connector c = l.accept(); |
| Connection connection = _engineFactory.createConnection(); |
| connection.setContainer(_name); |
| c.setConnection(connection); |
| //TODO: SSL and full SASL |
| Sasl sasl = c.sasl(); |
| if (sasl != null) |
| { |
| sasl.server(); |
| sasl.setMechanisms(new String[]{"ANONYMOUS"}); |
| sasl.done(Sasl.SaslOutcome.PN_SASL_OK); |
| } |
| connection.open(); |
| } |
| //process active connectors, handling opened & closed connections as needed |
| for (Connector c = _driver.connector(); c != null; c = _driver.connector()) |
| { |
| _logger.log(Level.FINE, "Processing active connector " + c); |
| try |
| { |
| c.process(); |
| } catch (IOException e) { |
| _logger.log(Level.SEVERE, "Error processing connection", e); |
| } |
| Connection connection = c.getConnection(); |
| |
| if (connection.getLocalState() == EndpointState.UNINITIALIZED) |
| { |
| connection.open(); |
| } |
| |
| Delivery delivery = connection.getWorkHead(); |
| while (delivery != null) |
| { |
| if (delivery.getLink() instanceof Sender && delivery.isUpdated()) |
| { |
| delivery.disposition(delivery.getRemoteState()); |
| } |
| //TODO: delivery.clear(); What's the equivalent in java? |
| delivery = delivery.getWorkNext(); |
| } |
| _outgoing.slide(); |
| |
| for (Session session : new Sessions(connection, UNINIT, ANY)) |
| { |
| session.open(); |
| _logger.log(Level.FINE, "Opened session " + session); |
| } |
| for (Link link : new Links(connection, UNINIT, ANY)) |
| { |
| //TODO: the following is not correct; should only copy those properties that we understand |
| link.setSource(link.getRemoteSource()); |
| link.setTarget(link.getRemoteTarget()); |
| link.open(); |
| _logger.log(Level.FINE, "Opened link " + link); |
| } |
| |
| distributeCredit(); |
| |
| for (Link link : new Links(connection, ACTIVE, CLOSED)) |
| { |
| link.close(); |
| } |
| for (Session session : new Sessions(connection, ACTIVE, CLOSED)) |
| { |
| session.close(); |
| } |
| if (connection.getRemoteState() == EndpointState.CLOSED) |
| { |
| if (connection.getLocalState() == EndpointState.ACTIVE) |
| { |
| connection.close(); |
| } |
| else if (connection.getLocalState() == EndpointState.CLOSED) |
| { |
| c.close(); |
| } |
| } |
| |
| if (c.isClosed()) |
| { |
| reclaimCredit(connection); |
| c.destroy(); |
| } |
| else |
| { |
| try |
| { |
| c.process(); |
| } |
| catch (IOException e) |
| { |
| _logger.log(Level.SEVERE, "Error processing connection", e); |
| } |
| } |
| } |
| } |
| |
| private void waitUntil(Predicate condition) throws TimeoutException |
| { |
| waitUntil(condition, _timeout); |
| } |
| |
| private void waitUntil(Predicate condition, long timeout) throws TimeoutException |
| { |
| processAllConnectors(); |
| |
| //wait until timeout expires or until test is true |
| long deadline = timeout < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeout; |
| |
| boolean wait = deadline > System.currentTimeMillis(); |
| boolean first = true; |
| boolean done = false; |
| |
| while (first || (!done && wait)) |
| { |
| if (wait && !done && !first) { |
| _driver.doWait(timeout < 0 ? 0 : deadline - System.currentTimeMillis()); |
| } |
| processActive(); |
| wait = deadline > System.currentTimeMillis(); |
| done = done || condition.test(); |
| first = false; |
| } |
| if (!done) |
| { |
| throw new TimeoutException(); |
| } |
| } |
| |
| private Connection lookup(String host, String service) |
| { |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| if (host.equals(connection.getRemoteContainer()) || service.equals(connection.getContext())) |
| { |
| return connection; |
| } |
| } |
| return null; |
| } |
| |
| private void reclaimCredit(Connection connection) |
| { |
| for (Link link : new Links(connection, ANY, ANY)) |
| { |
| if (link instanceof Receiver && link.getCredit() > 0) |
| { |
| reclaimCredit(link.getCredit()); |
| } |
| } |
| } |
| |
| private void reclaimCredit(int credit) |
| { |
| _credit += credit; |
| _distributed -= credit; |
| } |
| |
| private void distributeCredit() |
| { |
| int previous = 0; |
| while (_credit > 0 && _credit != previous) |
| { |
| previous = _credit; |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| for (Link link : new Links(connection, ACTIVE, ANY)) |
| { |
| if (link instanceof Receiver) |
| { |
| ((Receiver) link).flow(1); |
| _credit--; |
| _distributed++; |
| if (_credit == 0) return; |
| } |
| } |
| } |
| } |
| } |
| |
| private interface Predicate |
| { |
| boolean test(); |
| } |
| |
| private class SentSettled implements Predicate |
| { |
| public boolean test() |
| { |
| //are all sent messages settled? |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| for (Link link : new Links(connection, ACTIVE, ANY)) |
| { |
| if (link instanceof Sender) |
| { |
| if (link.getQueued() > 0) |
| { |
| return false; |
| } |
| //TODO: Sender.unsettled() not yet implemented, when it is change to the following |
| //if (checkSettled(link.unsettled()) |
| //{ |
| // return false; |
| //} |
| } |
| } |
| } |
| //TODO: Sender.unsettled() not yet implemented, when it is change to the following |
| //return true; |
| return checkSettled(_outgoing.deliveries()); |
| } |
| |
| boolean checkSettled(Iterator<Delivery> unsettled) |
| { |
| if (unsettled != null) |
| { |
| while (unsettled.hasNext()) |
| { |
| Delivery d = unsettled.next(); |
| if (d == null) |
| { |
| break; |
| } |
| if (d.getRemoteState() != null || d.remotelySettled()) |
| { |
| d.settle(); |
| } |
| else if (d.getLink().getSession().getConnection().getRemoteState() == EndpointState.CLOSED) |
| { |
| continue; |
| } |
| else |
| { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| } |
| |
| private class MessageAvailable implements Predicate |
| { |
| public boolean test() |
| { |
| //do we have at least one message? |
| for (Connector c : _driver.connectors()) |
| { |
| Connection connection = c.getConnection(); |
| Delivery delivery = connection.getWorkHead(); |
| while (delivery != null) |
| { |
| if (delivery.isReadable() && !delivery.isPartial()) |
| { |
| return true; |
| } |
| else |
| { |
| delivery = delivery.getWorkNext(); |
| } |
| } |
| } |
| return false; |
| } |
| } |
| |
| private class AllClosed implements Predicate |
| { |
| public boolean test() |
| { |
| if (_driver.connectors().iterator().hasNext()) return false; |
| else return true; |
| } |
| |
| } |
| |
| private final SentSettled _sentSettled = new SentSettled(); |
| private final MessageAvailable _messageAvailable = new MessageAvailable(); |
| private final AllClosed _allClosed = new AllClosed(); |
| |
| private interface LinkFinder<C extends Link> |
| { |
| C test(Link link); |
| C create(Session session); |
| } |
| |
| private class SenderFinder implements LinkFinder<Sender> |
| { |
| private final String _path; |
| |
| SenderFinder(String path) |
| { |
| _path = path; |
| } |
| |
| public Sender test(Link link) |
| { |
| if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path)) |
| { |
| return (Sender) link; |
| } |
| else |
| { |
| return null; |
| } |
| } |
| |
| public Sender create(Session session) |
| { |
| return session.sender(_path); |
| } |
| } |
| |
| private class ReceiverFinder implements LinkFinder<Receiver> |
| { |
| private final String _path; |
| |
| ReceiverFinder(String path) |
| { |
| _path = path; |
| } |
| |
| public Receiver test(Link link) |
| { |
| if (link instanceof Receiver && matchSource((Source) link.getSource(), _path)) |
| { |
| return (Receiver) link; |
| } |
| else |
| { |
| return null; |
| } |
| } |
| |
| public Receiver create(Session session) |
| { |
| return session.receiver(_path); |
| } |
| } |
| |
| private <C extends Link> C getLink(String host, int port, LinkFinder<C> finder) |
| { |
| String service = host + ":" + port; |
| Connection connection = lookup(host, service); |
| if (connection == null) |
| { |
| Connector connector = _driver.createConnector(host, port, null); |
| _logger.log(Level.FINE, "Connecting to " + host + ":" + port); |
| connection = _engineFactory.createConnection(); |
| connection.setContainer(_name); |
| connection.setHostname(host); |
| connection.setContext(service); |
| connector.setConnection(connection); |
| Sasl sasl = connector.sasl(); |
| if (sasl != null) |
| { |
| sasl.client(); |
| sasl.setMechanisms(new String[]{"ANONYMOUS"}); |
| } |
| connection.open(); |
| } |
| |
| for (Link link : new Links(connection, ACTIVE, ANY)) |
| { |
| C result = finder.test(link); |
| if (result != null) return result; |
| } |
| Session session = connection.session(); |
| session.open(); |
| C link = finder.create(session); |
| link.open(); |
| return link; |
| } |
| |
| private static class Links implements Iterable<Link> |
| { |
| private final Connection _connection; |
| private final EnumSet<EndpointState> _local; |
| private final EnumSet<EndpointState> _remote; |
| |
| Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) |
| { |
| _connection = connection; |
| _local = local; |
| _remote = remote; |
| } |
| |
| public java.util.Iterator<Link> iterator() |
| { |
| return new LinkIterator(_connection, _local, _remote); |
| } |
| } |
| |
| private static class LinkIterator implements java.util.Iterator<Link> |
| { |
| private final EnumSet<EndpointState> _local; |
| private final EnumSet<EndpointState> _remote; |
| private Link _next; |
| |
| LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) |
| { |
| _local = local; |
| _remote = remote; |
| _next = connection.linkHead(_local, _remote); |
| } |
| |
| public boolean hasNext() |
| { |
| return _next != null; |
| } |
| |
| public Link next() |
| { |
| try |
| { |
| return _next; |
| } |
| finally |
| { |
| _next = _next.next(_local, _remote); |
| } |
| } |
| |
| public void remove() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private static class Sessions implements Iterable<Session> |
| { |
| private final Connection _connection; |
| private final EnumSet<EndpointState> _local; |
| private final EnumSet<EndpointState> _remote; |
| |
| Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) |
| { |
| _connection = connection; |
| _local = local; |
| _remote = remote; |
| } |
| |
| public java.util.Iterator<Session> iterator() |
| { |
| return new SessionIterator(_connection, _local, _remote); |
| } |
| } |
| |
| private static class SessionIterator implements java.util.Iterator<Session> |
| { |
| private final EnumSet<EndpointState> _local; |
| private final EnumSet<EndpointState> _remote; |
| private Session _next; |
| |
| SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote) |
| { |
| _local = local; |
| _remote = remote; |
| _next = connection.sessionHead(_local, _remote); |
| } |
| |
| public boolean hasNext() |
| { |
| return _next != null; |
| } |
| |
| public Session next() |
| { |
| try |
| { |
| return _next; |
| } |
| finally |
| { |
| _next = _next.next(_local, _remote); |
| } |
| } |
| |
| public void remove() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| private void adjustReplyTo(Message m) |
| { |
| String original = m.getReplyTo(); |
| if (original == null || original.length() == 0) |
| { |
| m.setReplyTo("amqp://" + _name); |
| } |
| else if (original.startsWith("~/")) |
| { |
| m.setReplyTo("amqp://" + _name + "/" + original.substring(2)); |
| } |
| } |
| |
| private static boolean matchTarget(Target target, String path) |
| { |
| if (target == null) return path.isEmpty(); |
| else return path.equals(target.getAddress()); |
| } |
| |
| private static boolean matchSource(Source source, String path) |
| { |
| if (source == null) return path.isEmpty(); |
| else return path.equals(source.getAddress()); |
| } |
| |
| private static int defaultPort(String scheme) |
| { |
| if ("amqps".equals(scheme)) return 5671; |
| else return 5672; |
| } |
| |
| private static EngineFactory defaultEngineFactory() |
| { |
| return (EngineFactory) protonFactoryLoader.loadFactory(EngineFactory.class); |
| } |
| |
| private static DriverFactory defaultDriverFactory() |
| { |
| return (DriverFactory) protonFactoryLoader.loadFactory(DriverFactory.class); |
| } |
| |
| private static MessageFactory defaultMessageFactory() |
| { |
| return (MessageFactory) protonFactoryLoader.loadFactory(MessageFactory.class); |
| } |
| |
| } |