blob: e6475b96a419806d2cb9a8cac1908851cdac7a23 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.messenger.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.InterruptException;
import org.apache.qpid.proton.TimeoutException;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
import org.apache.qpid.proton.messenger.Status;
import org.apache.qpid.proton.messenger.Tracker;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.Binary;
public class MessengerImpl implements Messenger
{
private enum LinkCreditMode
{
// method for replenishing credit
LINK_CREDIT_EXPLICIT, // recv(N)
LINK_CREDIT_AUTO; // recv()
}
private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
private final Logger _logger = Logger.getLogger("proton.messenger");
private final String _name;
private long _timeout = -1;
private boolean _blocking = true;
private long _nextTag = 1;
private Driver _driver;
private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
private int _credit; // available
private int _distributed; // outstanding credit
private int _receivers; // total # receiver Links
private int _draining; // # Links in drain state
private List<Receiver> _credited = new ArrayList<Receiver>();
private List<Receiver> _blocked = new ArrayList<Receiver>();
private long _next_drain;
private TrackerImpl _incomingTracker;
private TrackerImpl _outgoingTracker;
private Store _incomingStore = new Store();
private Store _outgoingStore = new Store();
private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
private int _sendThreshold;
private Transform _routes = new Transform();
private Transform _rewrites = new Transform();
private String _certificate;
private String _privateKey;
private String _password;
private String _trustedDb;
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
* Client code outside this module should use a {@link MessengerFactory} instead
*/
@Deprecated public MessengerImpl()
{
this(java.util.UUID.randomUUID().toString());
}
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
* Client code outside this module should use a {@link MessengerFactory} instead
*/
@Deprecated public MessengerImpl(String name)
{
_name = name;
}
public void setTimeout(long timeInMillis)
{
_timeout = timeInMillis;
}
public long getTimeout()
{
return _timeout;
}
public boolean isBlocking()
{
return _blocking;
}
public void setBlocking(boolean b)
{
_blocking = b;
}
public void setCertificate(String certificate)
{
_certificate = certificate;
}
public String getCertificate()
{
return _certificate;
}
public void setPrivateKey(String privateKey)
{
_privateKey = privateKey;
}
public String getPrivateKey()
{
return _privateKey;
}
public void setPassword(String password)
{
_password = password;
}
public String getPassword()
{
return _password;
}
public void setTrustedCertificates(String trusted)
{
_trustedDb = trusted;
}
public String getTrustedCertificates()
{
return _trustedDb;
}
public void start() throws IOException
{
_driver = Proton.driver();
}
public void stop()
{
if (_driver != null) {
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to stop");
}
//close all connections
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
connection.close();
}
//stop listeners
for (Listener<?> l : _driver.listeners())
{
try
{
l.close();
}
catch (IOException e)
{
_logger.log(Level.WARNING, "Error while closing listener", e);
}
}
waitUntil(_allClosed);
}
}
public boolean stopped()
{
return _allClosed.test();
}
public boolean work(long timeout) throws TimeoutException
{
if (_driver == null) { return false; }
_worked = false;
return waitUntil(_workPred, timeout);
}
public void interrupt()
{
if (_driver != null) {
_driver.wakeup();
}
}
private String defaultRewrite(String address) {
if (address != null && address.contains("@")) {
Address addr = new Address(address);
String scheme = addr.getScheme();
String host = addr.getHost();
String port = addr.getPort();
String name = addr.getName();
StringBuilder sb = new StringBuilder();
if (scheme != null) {
sb.append(scheme).append("://");
}
if (host != null) {
sb.append(host);
}
if (port != null) {
sb.append(":").append(port);
}
if (name != null) {
sb.append("/").append(name);
}
return sb.toString();
} else {
return address;
}
}
private String _original;
private void rewriteMessage(Message m)
{
_original = m.getAddress();
if (_rewrites.apply(_original)) {
m.setAddress(_rewrites.result());
} else {
m.setAddress(defaultRewrite(_original));
}
}
private void restoreMessage(Message m)
{
m.setAddress(_original);
}
private String routeAddress(String addr)
{
if (_routes.apply(addr)) {
return _routes.result();
} else {
return addr;
}
}
public void put(Message m) throws MessengerException
{
if (_driver == null) {
throw new IllegalStateException("cannot put while messenger is stopped");
}
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to put message: " + m);
}
StoreEntry entry = _outgoingStore.put( m.getAddress() );
_outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
_outgoingStore.trackEntry(entry));
String routedAddress = routeAddress(m.getAddress());
Address address = new Address(routedAddress);
if (address.getHost() == null)
{
throw new MessengerException("unable to send to address: " + routedAddress);
}
rewriteMessage(m);
try {
adjustReplyTo(m);
int encoded;
byte[] buffer = new byte[5*1024];
while (true)
{
try
{
encoded = m.encode(buffer, 0, buffer.length);
break;
} catch (java.nio.BufferOverflowException e) {
buffer = new byte[buffer.length*2];
}
}
entry.setEncodedMsg( buffer, encoded );
}
finally
{
restoreMessage(m);
}
Sender sender = getLink(address, new SenderFinder(address.getName()));
pumpOut(m.getAddress(), sender);
}
private void reclaimLink(Link link)
{
if (link instanceof Receiver)
{
int credit = link.getCredit();
if (credit > 0)
{
_credit += credit;
_distributed -= credit;
}
}
Delivery delivery = link.head();
while (delivery != null)
{
StoreEntry entry = (StoreEntry) delivery.getContext();
if (entry != null)
{
entry.setDelivery(null);
if (delivery.isBuffered()) {
entry.setStatus(Status.ABORTED);
}
}
delivery = delivery.next();
}
linkRemoved(link);
}
private int pumpOut( String address, Sender sender )
{
StoreEntry entry = _outgoingStore.get( address );
if (entry == null) {
sender.drained();
return 0;
}
byte[] tag = String.valueOf(_nextTag++).getBytes();
Delivery delivery = sender.delivery(tag);
entry.setDelivery( delivery );
_logger.log(Level.FINE, "Sending on delivery: " + delivery);
int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
if (n < 0) {
_outgoingStore.freeEntry( entry );
_logger.log(Level.WARNING, "Send error: " + n);
return n;
} else {
sender.advance();
_outgoingStore.freeEntry( entry );
return 0;
}
}
public void send() throws TimeoutException
{
send(-1);
}
public void send(int n) throws TimeoutException
{
if (_driver == null) {
throw new IllegalStateException("cannot send while messenger is stopped");
}
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to send");
}
if (n == -1)
_sendThreshold = 0;
else
{
_sendThreshold = outgoing() - n;
if (_sendThreshold < 0)
_sendThreshold = 0;
}
waitUntil(_sentSettled);
}
public void recv(int n) throws TimeoutException
{
if (_driver == null) {
throw new IllegalStateException("cannot recv while messenger is stopped");
}
if (_logger.isLoggable(Level.FINE) && n != -1)
{
_logger.fine(this + " about to wait for up to " + n + " messages to be received");
}
if (n == -1)
{
_credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
}
else
{
_credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
if (n > _distributed)
_credit = n - _distributed;
else // cancel unallocated
_credit = 0;
}
distributeCredit();
waitUntil(_messageAvailable);
}
public void recv() throws TimeoutException
{
recv(-1);
}
public int receiving()
{
return _credit + _distributed;
}
public Message get()
{
StoreEntry entry = _incomingStore.get( null );
if (entry != null)
{
Message message = Proton.message();
message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
_incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
_incomingStore.trackEntry(entry));
_incomingStore.freeEntry( entry );
return message;
}
return null;
}
private int pumpIn(String address, Receiver receiver)
{
Delivery delivery = receiver.current();
if (delivery.isReadable() && !delivery.isPartial())
{
StoreEntry entry = _incomingStore.put( address );
entry.setDelivery( delivery );
_logger.log(Level.FINE, "Readable delivery found: " + delivery);
int size = delivery.pending();
byte[] buffer = new byte[size];
int read = receiver.recv( buffer, 0, buffer.length );
if (read != size) {
throw new IllegalStateException();
}
entry.setEncodedMsg( buffer, size );
receiver.advance();
// account for the used credit, replenish if
// low (< 20% maximum per-link batch) and
// extra credit available
assert(_distributed > 0);
_distributed--;
if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
{
final int max = perLinkCredit();
final int lo_thresh = (int)(max * 0.2 + 0.5);
if (receiver.getRemoteCredit() < lo_thresh)
{
final int more = Math.min(_credit, max - receiver.getRemoteCredit());
_credit -= more;
_distributed += more;
receiver.flow(more);
}
}
// check if blocked
if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
{
_credited.remove(receiver);
if (receiver.getDrain())
{
receiver.setDrain(false);
assert( _draining > 0 );
_draining--;
}
_blocked.add(receiver);
}
}
return 0;
}
public void subscribe(String source) throws MessengerException
{
if (_driver == null) {
throw new IllegalStateException("messenger is stopped");
}
String routed = routeAddress(source);
Address address = new Address(routed);
String hostName = address.getHost();
if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
int port = Integer.valueOf(address.getImpliedPort());
if (address.isPassive())
{
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
}
ListenerContext ctx = new ListenerContext(address);
_driver.createListener(hostName, port, ctx);
}
else
{
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to subscribe to source " + source);
}
getLink(address, new ReceiverFinder(address.getName()));
}
}
public int outgoing()
{
return _outgoingStore.size() + queued(true);
}
public int incoming()
{
return _incomingStore.size() + queued(false);
}
public int getIncomingWindow()
{
return _incomingStore.getWindow();
}
public void setIncomingWindow(int window)
{
_incomingStore.setWindow(window);
}
public int getOutgoingWindow()
{
return _outgoingStore.getWindow();
}
public void setOutgoingWindow(int window)
{
_outgoingStore.setWindow(window);
}
public Tracker incomingTracker()
{
return _incomingTracker;
}
public Tracker outgoingTracker()
{
return _outgoingTracker;
}
private Store getTrackerStore(Tracker tracker)
{
return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
}
@Override
public void reject(Tracker tracker, int flags)
{
int id = ((TrackerImpl)tracker).getSequence();
getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
}
@Override
public void accept(Tracker tracker, int flags)
{
int id = ((TrackerImpl)tracker).getSequence();
getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
}
@Override
public void settle(Tracker tracker, int flags)
{
int id = ((TrackerImpl)tracker).getSequence();
getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
}
public Status getStatus(Tracker tracker)
{
int id = ((TrackerImpl)tracker).getSequence();
StoreEntry e = getTrackerStore(tracker).getEntry(id);
if (e != null)
{
return e.getStatus();
}
return Status.UNKNOWN;
}
@Override
public void route(String pattern, String address)
{
_routes.rule(pattern, address);
}
@Override
public void rewrite(String pattern, String address)
{
_rewrites.rule(pattern, address);
}
private int queued(boolean outgoing)
{
int count = 0;
if (_driver != null) {
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
if (outgoing)
{
if (link instanceof Sender) count += link.getQueued();
}
else
{
if (link instanceof Receiver) count += link.getQueued();
}
}
}
}
return count;
}
private void bringDestruction()
{
for (Connector<?> c : _awaitingDestruction)
{
c.destroy();
}
_awaitingDestruction.clear();
}
private void processAllConnectors()
{
distributeCredit();
for (Connector<?> c : _driver.connectors())
{
processEndpoints(c);
try
{
if (c.process()) {
_worked = true;
}
}
catch (IOException e)
{
_logger.log(Level.SEVERE, "Error processing connection", e);
}
}
bringDestruction();
distributeCredit();
}
private void processActive()
{
//process active listeners
for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
{
_worked = true;
Connector<?> c = l.accept();
Connection connection = Proton.connection();
connection.setContainer(_name);
ListenerContext ctx = (ListenerContext) l.getContext();
connection.setContext(new ConnectionContext(ctx.getAddress(), c));
c.setConnection(connection);
Transport transport = c.getTransport();
//TODO: full SASL
Sasl sasl = c.sasl();
if (sasl != null)
{
sasl.server();
sasl.setMechanisms(new String[]{"ANONYMOUS"});
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
}
transport.ssl(ctx.getDomain());
connection.open();
}
// process connectors, reclaiming credit on closed connectors
for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
{
_worked = true;
if (c.isClosed())
{
_awaitingDestruction.add(c);
reclaimCredit(c.getConnection());
}
else
{
_logger.log(Level.FINE, "Processing active connector " + c);
try
{
c.process();
processEndpoints(c);
c.process();
}
catch (IOException e)
{
_logger.log(Level.SEVERE, "Error processing connection", e);
}
}
}
bringDestruction();
distributeCredit();
}
private void processEndpoints(Connector c)
{
Connection connection = c.getConnection();
if (connection.getLocalState() == EndpointState.UNINITIALIZED)
{
connection.open();
}
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
Link link = delivery.getLink();
if (delivery.isUpdated())
{
if (link instanceof Sender)
{
delivery.disposition(delivery.getRemoteState());
}
StoreEntry e = (StoreEntry) delivery.getContext();
if (e != null) e.updated();
}
if (delivery.isReadable())
{
pumpIn( link.getSource().getAddress(), (Receiver)link );
}
Delivery next = delivery.getWorkNext();
delivery.clear();
delivery = next;
}
for (Session session : new Sessions(connection, UNINIT, ANY))
{
session.open();
_logger.log(Level.FINE, "Opened session " + session);
}
for (Link link : new Links(connection, UNINIT, ANY))
{
//TODO: the following is not correct; should only copy those properties that we understand
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
linkAdded(link);
link.open();
_logger.log(Level.FINE, "Opened link " + link);
}
distributeCredit();
for (Link link : new Links(connection, ACTIVE, ACTIVE))
{
if (link instanceof Sender)
{
pumpOut(link.getTarget().getAddress(), (Sender)link);
}
}
for (Session session : new Sessions(connection, ACTIVE, CLOSED))
{
session.close();
}
for (Link link : new Links(connection, ANY, CLOSED))
{
if (link.getLocalState() == EndpointState.ACTIVE)
{
link.close();
}
else
{
reclaimLink(link);
}
}
if (connection.getRemoteState() == EndpointState.CLOSED)
{
if (connection.getLocalState() == EndpointState.ACTIVE)
{
connection.close();
}
}
}
private boolean waitUntil(Predicate condition) throws TimeoutException
{
if (_blocking) {
boolean done = waitUntil(condition, _timeout);
if (!done) {
_logger.log(Level.SEVERE, String.format
("Timeout when waiting for condition %s after %s ms",
condition, _timeout));
throw new TimeoutException();
}
return done;
} else {
return waitUntil(condition, 0);
}
}
private boolean waitUntil(Predicate condition, long timeout)
{
if (_driver == null) {
throw new IllegalStateException("cannot wait while messenger is stopped");
}
processAllConnectors();
// wait until timeout expires or until test is true
long now = System.currentTimeMillis();
final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
boolean done = false;
while (true)
{
done = condition.test();
if (done) break;
long remaining;
if (timeout < 0)
remaining = -1;
else {
remaining = deadline - now;
if (remaining < 0) break;
}
// Update the credit scheduler. If the scheduler detects
// credit imbalance on the links, wake up in time to
// service credit drain
distributeCredit();
if (_next_drain != 0)
{
long wakeup = (_next_drain > now) ? _next_drain - now : 0;
remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
}
boolean woken;
woken = _driver.doWait(remaining);
processActive();
if (woken) {
throw new InterruptException();
}
now = System.currentTimeMillis();
}
return done;
}
private Connection lookup(Address address)
{
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
ConnectionContext ctx = (ConnectionContext) connection.getContext();
if (ctx.matches(address))
{
return connection;
}
}
return null;
}
private void reclaimCredit(Connection connection)
{
for (Link link : new Links(connection, ANY, ANY))
{
reclaimLink(link);
}
}
private void distributeCredit()
{
if (_receivers == 0) return;
if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
{
// replenish, but limit the max total messages buffered
final int max = _receivers * _credit_batch;
final int used = _distributed + incoming();
if (max > used)
_credit = max - used;
}
// reclaim any credit left over after draining links has completed
if (_draining > 0)
{
Iterator<Receiver> itr = _credited.iterator();
while (itr.hasNext())
{
Receiver link = (Receiver) itr.next();
if (link.getDrain())
{
if (!link.draining())
{
// drain completed for this link
int drained = link.drained();
assert(_distributed >= drained);
_distributed -= drained;
_credit += drained;
link.setDrain(false);
_draining--;
itr.remove();
_blocked.add(link);
}
}
}
}
// distribute available credit to blocked links
final int batch = perLinkCredit();
while (_credit > 0 && !_blocked.isEmpty())
{
Receiver link = _blocked.get(0);
_blocked.remove(0);
final int more = Math.min(_credit, batch);
_distributed += more;
_credit -= more;
link.flow(more);
_credited.add(link);
// flow changed, must process it
ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
try
{
ctx.getConnector().process();
} catch (IOException e) {
_logger.log(Level.SEVERE, "Error processing connection", e);
}
}
if (_blocked.isEmpty())
{
_next_drain = 0;
}
else
{
// not enough credit for all links - start draining granted credit
if (_draining == 0)
{
// don't do it too often - pace ourselves (it's expensive)
if (_next_drain == 0)
{
_next_drain = System.currentTimeMillis() + 250;
}
else if (_next_drain <= System.currentTimeMillis())
{
// initiate drain, free up at most enough to satisfy blocked
_next_drain = 0;
int needed = _blocked.size() * batch;
for (Receiver link : _credited)
{
if (!link.getDrain()) {
link.setDrain(true);
needed -= link.getRemoteCredit();
_draining++;
// drain requested on link, must process it
ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
try
{
ctx.getConnector().process();
} catch (IOException e) {
_logger.log(Level.SEVERE, "Error processing connection", e);
}
if (needed <= 0) break;
}
}
}
}
}
}
private interface Predicate
{
boolean test();
}
private class SentSettled implements Predicate
{
public boolean test()
{
//are all sent messages settled?
int total = _outgoingStore.size();
for (Connector<?> c : _driver.connectors())
{
// TBD
// check if transport is done generating output
// pn_transport_t *transport = pn_connector_transport(ctor);
// if (transport) {
// if (!pn_transport_quiesced(transport)) {
// pn_connector_process(ctor);
// return false;
// }
// }
Connection connection = c.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
if (link instanceof Sender)
{
total += link.getQueued();
}
}
// TBD: there is no per-link unsettled
// deliveries iterator, so for now get the
// deliveries by walking the outgoing trackers
Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
while (entries.hasNext() && total <= _sendThreshold)
{
StoreEntry e = (StoreEntry) entries.next();
if (e != null )
{
Delivery d = e.getDelivery();
if (d != null)
{
if (d.getRemoteState() == null && !d.remotelySettled())
{
total++;
}
}
}
}
}
return total <= _sendThreshold;
}
}
private class MessageAvailable implements Predicate
{
public boolean test()
{
//do we have at least one pending message?
if (_incomingStore.size() > 0) return true;
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
if (delivery.isReadable() && !delivery.isPartial())
{
return true;
}
else
{
delivery = delivery.getWorkNext();
}
}
}
// if no connections, or not listening, exit as there won't ever be a message
if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
return true;
return false;
}
}
private class AllClosed implements Predicate
{
public boolean test()
{
if (_driver == null) {
return true;
}
for (Connector<?> c : _driver.connectors()) {
if (!c.isClosed()) {
return false;
}
}
_driver.destroy();
_driver = null;
return true;
}
}
private boolean _worked = false;
private class WorkPred implements Predicate
{
public boolean test()
{
return _worked;
}
}
private final SentSettled _sentSettled = new SentSettled();
private final MessageAvailable _messageAvailable = new MessageAvailable();
private final AllClosed _allClosed = new AllClosed();
private final WorkPred _workPred = new WorkPred();
private interface LinkFinder<C extends Link>
{
C test(Link link);
C create(Session session);
}
private class SenderFinder implements LinkFinder<Sender>
{
private final String _path;
SenderFinder(String path)
{
_path = path == null ? "" : path;
}
public Sender test(Link link)
{
if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
{
return (Sender) link;
}
else
{
return null;
}
}
public Sender create(Session session)
{
Sender sender = session.sender(_path);
Target target = new Target();
target.setAddress(_path);
sender.setTarget(target);
// the C implemenation does this:
Source source = new Source();
source.setAddress(_path);
sender.setSource(source);
if (getOutgoingWindow() > 0)
{
// use explicit settlement via dispositions (not pre-settled)
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
}
return sender;
}
}
private class ReceiverFinder implements LinkFinder<Receiver>
{
private final String _path;
ReceiverFinder(String path)
{
_path = path == null ? "" : path;
}
public Receiver test(Link link)
{
if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
{
return (Receiver) link;
}
else
{
return null;
}
}
public Receiver create(Session session)
{
Receiver receiver = session.receiver(_path);
Source source = new Source();
source.setAddress(_path);
receiver.setSource(source);
// the C implemenation does this:
Target target = new Target();
target.setAddress(_path);
receiver.setTarget(target);
if (getIncomingWindow() > 0)
{
// use explicit settlement via dispositions (not pre-settled)
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
}
return receiver;
}
}
private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
{
Connection connection = lookup(address);
if (connection == null)
{
String host = address.getHost();
int port = Integer.valueOf(address.getImpliedPort());
Connector<?> connector = _driver.createConnector(host, port, null);
_logger.log(Level.FINE, "Connecting to " + host + ":" + port);
connection = Proton.connection();
connection.setContainer(_name);
connection.setHostname(host);
connection.setContext(new ConnectionContext(address, connector));
connector.setConnection(connection);
Sasl sasl = connector.sasl();
if (sasl != null)
{
sasl.client();
sasl.setMechanisms(new String[]{"ANONYMOUS"});
}
if ("amqps".equalsIgnoreCase(address.getScheme())) {
Transport transport = connector.getTransport();
SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
if (_trustedDb != null) {
domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
//domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
} else {
domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
}
Ssl ssl = transport.ssl(domain);
//ssl.setPeerHostname(host);
}
connection.open();
}
for (Link link : new Links(connection, ACTIVE, ANY))
{
C result = finder.test(link);
if (result != null) return result;
}
Session session = connection.session();
session.open();
C link = finder.create(session);
linkAdded(link);
link.open();
return link;
}
private static class Links implements Iterable<Link>
{
private final Connection _connection;
private final EnumSet<EndpointState> _local;
private final EnumSet<EndpointState> _remote;
Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
_connection = connection;
_local = local;
_remote = remote;
}
public java.util.Iterator<Link> iterator()
{
return new LinkIterator(_connection, _local, _remote);
}
}
private static class LinkIterator implements java.util.Iterator<Link>
{
private final EnumSet<EndpointState> _local;
private final EnumSet<EndpointState> _remote;
private Link _next;
LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
_local = local;
_remote = remote;
_next = connection.linkHead(_local, _remote);
}
public boolean hasNext()
{
return _next != null;
}
public Link next()
{
try
{
return _next;
}
finally
{
_next = _next.next(_local, _remote);
}
}
public void remove()
{
throw new UnsupportedOperationException();
}
}
private static class Sessions implements Iterable<Session>
{
private final Connection _connection;
private final EnumSet<EndpointState> _local;
private final EnumSet<EndpointState> _remote;
Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
_connection = connection;
_local = local;
_remote = remote;
}
public java.util.Iterator<Session> iterator()
{
return new SessionIterator(_connection, _local, _remote);
}
}
private static class SessionIterator implements java.util.Iterator<Session>
{
private final EnumSet<EndpointState> _local;
private final EnumSet<EndpointState> _remote;
private Session _next;
SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
{
_local = local;
_remote = remote;
_next = connection.sessionHead(_local, _remote);
}
public boolean hasNext()
{
return _next != null;
}
public Session next()
{
try
{
return _next;
}
finally
{
_next = _next.next(_local, _remote);
}
}
public void remove()
{
throw new UnsupportedOperationException();
}
}
private void adjustReplyTo(Message m)
{
String original = m.getReplyTo();
if (original != null) {
if (original.startsWith("~/"))
{
m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
}
else if (original.equals("~"))
{
m.setReplyTo("amqp://" + _name);
}
}
}
private static boolean matchTarget(Target target, String path)
{
if (target == null) return path.isEmpty();
else return path.equals(target.getAddress());
}
private static boolean matchSource(Source source, String path)
{
if (source == null) return path.isEmpty();
else return path.equals(source.getAddress());
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append("MessengerImpl [_name=").append(_name).append("]");
return builder.toString();
}
// compute the maximum amount of credit each receiving link is
// entitled to. The actual credit given to the link depends on
// what amount of credit is actually available.
private int perLinkCredit()
{
if (_receivers == 0) return 0;
int total = _credit + _distributed;
return Math.max(total/_receivers, 1);
}
// a new link has been created, account for it.
private void linkAdded(Link link)
{
if (link instanceof Receiver)
{
_receivers++;
_blocked.add((Receiver)link);
link.setContext(Boolean.TRUE);
}
}
// a link is being removed, account for it.
private void linkRemoved(Link _link)
{
if (_link instanceof Receiver && (Boolean) _link.getContext())
{
_link.setContext(Boolean.FALSE);
Receiver link = (Receiver)_link;
assert _receivers > 0;
_receivers--;
if (link.getDrain())
{
link.setDrain(false);
assert _draining > 0;
_draining--;
}
if (_blocked.contains(link))
_blocked.remove(link);
else if (_credited.contains(link))
_credited.remove(link);
else
assert(false);
}
}
private class ConnectionContext
{
private Address _address;
private Connector _connector;
public ConnectionContext(Address address, Connector connector)
{
_address = address;
_connector = connector;
}
public Address getAddress()
{
return _address;
}
public boolean matches(Address address)
{
String host = address.getHost();
String port = address.getImpliedPort();
Connection conn = _connector.getConnection();
return host.equals(conn.getRemoteContainer()) ||
(_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
}
public Connector getConnector()
{
return _connector;
}
}
private SslDomain makeDomain(Address address, SslDomain.Mode mode)
{
SslDomain domain = Proton.sslDomain();
domain.init(mode);
if (_certificate != null) {
domain.setCredentials(_certificate, _privateKey, _password);
}
if (_trustedDb != null) {
domain.setTrustedCaDb(_trustedDb);
}
if ("amqps".equalsIgnoreCase(address.getScheme())) {
domain.allowUnsecuredClient(false);
} else {
domain.allowUnsecuredClient(true);
}
return domain;
}
private class ListenerContext
{
private Address _address;
private SslDomain _domain;
public ListenerContext(Address address)
{
_address = address;
_domain = makeDomain(address, SslDomain.Mode.SERVER);
}
public SslDomain getDomain()
{
return _domain;
}
public Address getAddress()
{
return _address;
}
}
}