More workarounds to allow testing of both codecs side by side.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
index e3aaf62..46a89a9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkReceiver.java
@@ -1,21 +1,16 @@
package org.apache.qpid.proton;
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
public class BenchmarkReceiver
-{
- public static void main(String[] args) throws Exception
+{
+ static void messengerOld(int count) throws Exception
{
- int count = Integer.getInteger("count", 1);
-
- Messenger rec = Proton.messenger();
+ org.apache.qpid.proton.messenger.Messenger rec = new org.apache.qpid.proton.messenger.impl.MessengerImpl();
rec.start();
rec.subscribe("amqp://~localhost:5672");
int i = 0;
- Message m = null;
+ org.apache.qpid.proton.message.Message m = null;
long start = 0;
- while (i <count)
+ while (i < count)
{
rec.recv();
while ((m = rec.get()) != null)
@@ -23,10 +18,48 @@
i++;
start = m.getCreationTime();
}
- }
- System.out.println("Count " + count + " i " + i);
-
- double elapsed = (System.nanoTime() - start)/1000000;
- System.out.println(String.format("Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+ }
+ System.out.println("Count " + count + " i " + i);
+
+ double elapsed = (System.nanoTime() - start) / 1000000;
+ System.out.println(String.format("Old codec Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+ }
+
+
+ static void messengerNew(int count) throws Exception
+ {
+ org.apache.qpid.proton.messenger.Messenger2 rec = new org.apache.qpid.proton.messenger.impl.MessengerImpl2();
+ rec.start();
+ rec.subscribe("amqp://~localhost:5672");
+ int i = 0;
+ org.apache.qpid.proton.message2.Message m = null;
+ long start = 0;
+ while (i < count)
+ {
+ rec.recv();
+ while ((m = rec.get()) != null)
+ {
+ i++;
+ start = m.getCreationTime();
+ }
+ }
+
+ double elapsed = (System.nanoTime() - start) / 1000000;
+ System.out.println(String.format("New codec Time elapsed (end-to-end) for %s msgs : %s (milli secs)", count, elapsed));
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ int count = Integer.getInteger("count", 1);
+ boolean isNewCodec = Boolean.getBoolean("new-codec");
+
+ if (isNewCodec)
+ {
+ messengerNew(count);
+ }
+ else
+ {
+ messengerOld(count);
+ }
}
}
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
index a118c2d..a01296d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/BenchmarkSender.java
@@ -6,103 +6,137 @@
import java.util.List;
import java.util.Map;
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
public class BenchmarkSender
{
static String largeString = "Knowing that millions of people around the world would be watching in person and on television and expecting great";
+
static String smallString = "Hello world";
+
static long n = Long.MAX_VALUE;
- public static Message getMessage(int op)
+ enum OP {STR_SMALL, STR_BIG, LONG, LIST_SMALL, LIST_BIG, MAP_SMALL, MAP_BIG, NESTED_SMALL, NESTED_BIG};
+
+ public static Object getPayload(int op)
{
- Message m = Proton.message2();
- m.setAddress("amqp://localhost:5672");
switch (op)
{
- case 0 :
- m.setBody(smallString);
- break;
- case 1 :
- m.setBody(largeString);
- break;
- case 2 :
- m.setBody(n);
- break;
- case 3 :
- List<String> listSm = new ArrayList<String>();
- for (int i=0; i<5; i++)
- {
- listSm.add(smallString);
- }
- m.setBody(listSm);
- break;
- case 4 :
+ case 0:
+ return smallString;
+ case 1:
+ return largeString;
+ case 2:
+ return n;
+ case 3:
+ List<String> listSm = new ArrayList<String>();
+ for (int i = 0; i < 5; i++)
+ {
+ listSm.add(smallString);
+ }
+ return listSm;
+ case 4:
List<String> listBig = new ArrayList<String>();
- for (int i=0; i<100; i++)
+ for (int i = 0; i < 100; i++)
{
listBig.add(smallString);
- }
- m.setBody(listBig);
- break;
+ }
+ return listBig;
case 5:
- Map<String, Integer> mapSm = new HashMap<String, Integer>();
- for (int i=0; i<5; i++)
- {
- mapSm.put(String.valueOf(i), i);
- }
- m.setBody(mapSm);
- break;
+ Map<String, Integer> mapSm = new HashMap<String, Integer>();
+ for (int i = 0; i < 5; i++)
+ {
+ mapSm.put(String.valueOf(i), i);
+ }
+ return mapSm;
case 6:
- Map<String, Integer> mapBig = new HashMap<String, Integer>();
- for (int i=0; i<100; i++)
+ Map<String, Integer> mapBig = new HashMap<String, Integer>();
+ for (int i = 0; i < 100; i++)
{
mapBig.put(String.valueOf(i), i);
- }
- m.setBody(mapBig);
+ }
+ return mapBig;
case 7:
- List<Integer> l1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
- Map<String, List<Integer>> nestedMapSm = new HashMap<String, List<Integer>>();
- for (int i=0; i<5; i++)
+ List<Integer> l1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ Map<String, List<Integer>> nestedMapSm = new HashMap<String, List<Integer>>();
+ for (int i = 0; i < 5; i++)
{
nestedMapSm.put(String.valueOf(i), l1);
}
- m.setBody(nestedMapSm);
- break;
+ return nestedMapSm;
case 8:
- List<Integer> l2 = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
- Map<String, List<Integer>> nestedMapBig = new HashMap<String, List<Integer>>();
- for (int i=0; i<100; i++)
+ List<Integer> l2 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ Map<String, List<Integer>> nestedMapBig = new HashMap<String, List<Integer>>();
+ for (int i = 0; i < 100; i++)
{
nestedMapBig.put(String.valueOf(i), l2);
}
- m.setBody(nestedMapBig);
- break;
+ return nestedMapBig;
+ default:
+ return smallString;
}
- return m;
}
-
- public static void main(String[] args) throws Exception
+
+ static void messengerOld(int count, int op) throws Exception
{
- int count = Integer.getInteger("count", 1);
- int op = Integer.getInteger("op", 4);
-
- Message msg = getMessage(op);
- Messenger sender = Proton.messenger();
+ Object obj = getPayload(op);
+ org.apache.qpid.proton.message.Message msg = Proton.message();
+ msg.setAddress("amqp://localhost:5672");
+ msg.setBody(new AmqpValue(obj));
+ org.apache.qpid.proton.messenger.Messenger sender = new org.apache.qpid.proton.messenger.impl.MessengerImpl();
sender.start();
-
+
long start = System.nanoTime();
- for (int i=0; i<count; i++)
+ for (int i = 0; i < count; i++)
{
msg.setCreationTime(start);
sender.put(msg);
}
sender.send();
- double elapsed = (System.nanoTime() - start)/1000000;
- System.out.println(String.format("Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, op, elapsed));
-
+ double elapsed = (System.nanoTime() - start) / 1000000;
+ System.out.println(String.format("Old codec Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, OP.values()[op],
+ elapsed));
+ }
+
+ static void messengerNew(int count, int op) throws Exception
+ {
+ Object obj = getPayload(op);
+ org.apache.qpid.proton.message2.Message msg = Proton.message2();
+ msg.setAddress("amqp://localhost:5672");
+ msg.setBody(obj);
+ org.apache.qpid.proton.messenger.Messenger2 sender = new org.apache.qpid.proton.messenger.impl.MessengerImpl2();
+ sender.start();
+
+ long start = System.nanoTime();
+ for (int i = 0; i < count; i++)
+ {
+ msg.setCreationTime(start);
+ sender.put(msg);
+ }
+ sender.send();
+
+ double elapsed = (System.nanoTime() - start) / 1000000;
+ System.out.println(String.format("New Codec Time elapsed for sending %s msgs for op [%s] : %s (milli secs)", count, op,
+ elapsed));
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ int count = Integer.getInteger("count", 1);
+ int op = Integer.getInteger("op", 1);
+
+ boolean isNewCodec = Boolean.getBoolean("new-codec");
+
+ if (isNewCodec)
+ {
+ messengerNew(count, op);
+ }
+ else
+ {
+ messengerOld(count, op);
+ }
+
final Object ob = new Object();
synchronized (ob)
{
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java
deleted file mode 100644
index 4143caf..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/TestReceiver.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.qpid.proton;
-import java.io.IOException;
-
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
-
-public class TestReceiver
-{
- public static void main(String[] args) throws IOException
- {
- Messenger messenger = Proton.messenger();
- messenger.start();
- messenger.subscribe("amqp://~localhost:5672");
- messenger.recv();
- Message m = messenger.get();
- System.out.println("Received " + m);
- }
-
-}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java b/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java
deleted file mode 100644
index f522aa8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/TestSender.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.qpid.proton;
-import java.io.IOException;
-
-import org.apache.qpid.proton.message2.AmqpValue;
-import org.apache.qpid.proton.message2.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-
-public class TestSender
-{
- public static void main(String[] args) throws IOException, InterruptedException
- {
- Messenger messenger = Proton.messenger();
- messenger.start();
- for (int i=0; i < 1; i++)
- {
- Message m = Proton.message2();
- m.setAddress("amqp://localhost:5672");
- m.setBody(new AmqpValue("Msg" + i));
- messenger.put(m);
- }
- messenger.send();
- }
-}
\ No newline at end of file
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
index cd6d93d..6d3f362 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
@@ -23,7 +23,7 @@
import java.io.IOException;
import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.impl.MessengerImpl;
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java
new file mode 100644
index 0000000..47ab412
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger2.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.messenger.impl.MessengerImpl2;
+
+/**
+ *
+ * Messenger defines a high level interface for sending and receiving
+ * messages. Every Messenger contains a single logical queue of
+ * incoming messages and a single logical queue of outgoing
+ * messages. These messages in these queues may be destined for, or
+ * originate from, a variety of addresses.
+ *
+ * <h3>Address Syntax</h3>
+ *
+ * An address has the following form:
+ *
+ * [ amqp[s]:// ] [user[:password]@] domain [/[name]]
+ *
+ * Where domain can be one of:
+ *
+ * host | host:port | ip | ip:port | name
+ *
+ * The following are valid examples of addresses:
+ *
+ * - example.org
+ * - example.org:1234
+ * - amqp://example.org
+ * - amqps://example.org
+ * - example.org/incoming
+ * - amqps://example.org/outgoing
+ * - amqps://fred:trustno1@example.org
+ * - 127.0.0.1:1234
+ * - amqps://127.0.0.1:1234
+ *
+ * <h3>Sending & Receiving Messages</h3>
+ *
+ * The Messenger interface works in conjuction with the Message
+ * class. The Message class is a mutable holder of message content.
+ * The put method will encode the content in a given Message object
+ * into the outgoing message queue leaving that Message object free
+ * to be modified or discarded without having any impact on the
+ * content in the outgoing queue.
+ *
+ * Similarly, the get method will decode the content in the incoming
+ * message queue into the supplied Message object.
+*/
+public interface Messenger2
+{
+
+ public static final class Factory
+ {
+ public static Messenger2 create() {
+ return new MessengerImpl2();
+ }
+
+ public static Messenger2 create(String name) {
+ return new MessengerImpl2(name);
+ }
+ }
+
+ /**
+ * Flag for use with reject(), accept() and settle() methods.
+ */
+ static final int CUMULATIVE = 0x01;
+
+ /**
+ * Places the content contained in the message onto the outgoing
+ * queue of the Messenger. This method will never block. The
+ * send call may be used to block until the messages are
+ * sent. Either a send() or a recv() call is neceesary at present
+ * to cause the messages to actually be sent out.
+ */
+ void put(Message message) throws MessengerException;
+
+ /**
+ * Blocks until the outgoing queue is empty and, in the event that
+ * an outgoing window has been set, until the messages in that
+ * window have been received by the target to which they were
+ * sent, or the operation times out. The timeout property
+ * controls how long a Messenger will block before timing out.
+ */
+ void send() throws TimeoutException;
+
+ void send(int n) throws TimeoutException;
+
+ /**
+ * Subscribes the Messenger to messages originating from the
+ * specified source. The source is an address as specified in the
+ * Messenger introduction with the following addition. If the
+ * domain portion of the address begins with the '~' character,
+ * the Messenger will interpret the domain as host/port, bind
+ * to it, and listen for incoming messages. For example
+ * "~0.0.0.0", "amqp://~0.0.0.0" will bind to any local interface
+ * and listen for incoming messages.
+ */
+ void subscribe(String source) throws MessengerException;
+ /**
+ * Receives an arbitrary number of messages into the
+ * incoming queue of the Messenger. This method will block until
+ * at least one message is available or the operation times out.
+ */
+ void recv() throws TimeoutException;
+ /**
+ * Receives up to the specified number of messages into the
+ * incoming queue of the Messenger. This method will block until
+ * at least one message is available or the operation times out.
+ */
+ void recv(int count) throws TimeoutException;
+ /**
+ * Returns the capacity of the incoming message queue of
+ * messenger. Note this count does not include those messages
+ * already available on the incoming queue (see
+ * incoming()). Rather it returns the number of incoming queue
+ * entries available for receiving messages
+ */
+ int receiving();
+ /**
+ * Returns the message from the head of the incoming message
+ * queue.
+ */
+ Message get();
+
+ /**
+ * Transitions the Messenger to an active state. A Messenger is
+ * initially created in an inactive state. When inactive, a
+ * Messenger will not send or receive messages from its internal
+ * queues. A Messenger must be started before calling send() or
+ * recv().
+ */
+ void start() throws IOException;
+ /**
+ * Transitions the Messenger to an inactive state. An inactive
+ * Messenger will not send or receive messages from its internal
+ * queues. A Messenger should be stopped before being discarded to
+ * ensure a clean shutdown handshake occurs on any internally managed
+ * connections.
+ */
+ void stop();
+
+ boolean stopped();
+
+ /** Sends or receives any outstanding messages queued for a
+ * messenger. If timeout is zero, no blocking is done. A timeout
+ * of -1 blocks forever, otherwise timeout is the maximum time (in
+ * millisecs) to block. Returns True if work was performed.
+ */
+ boolean work(long timeout) throws TimeoutException;
+
+ void interrupt();
+
+ void setTimeout(long timeInMillis);
+ long getTimeout();
+
+ boolean isBlocking();
+ void setBlocking(boolean b);
+
+ /**
+ * Returns a count of the messages currently on the outgoing queue
+ * (i.e. those that have been put() but not yet actually sent
+ * out).
+ */
+ int outgoing();
+ /**
+ * Returns a count of the messages available on the incoming
+ * queue.
+ */
+ int incoming();
+
+ int getIncomingWindow();
+ void setIncomingWindow(int window);
+
+ int getOutgoingWindow();
+ void setOutgoingWindow(int window);
+
+ /**
+ * Returns a token which can be used to accept or reject the
+ * message returned in the previous get() call.
+ */
+ Tracker incomingTracker();
+ /**
+ * Returns a token which can be used to track the status of the
+ * message of the previous put() call.
+ */
+ Tracker outgoingTracker();
+
+ /**
+ * Rejects messages retrieved from the incoming message queue. The
+ * tracker object for a message is obtained through a call to
+ * incomingTracker() following a get(). If the flags argument
+ * contains CUMULATIVE, then all message up to the one identified
+ * by the tracker will be rejected.
+ */
+ void reject(Tracker tracker, int flags);
+ /**
+ * Accepts messages retrieved from the incoming message queue. The
+ * tracker object for a message is obtained through a call to
+ * incomingTracker() following a get(). If the flags argument
+ * contains CUMULATIVE, then all message up to the one identified
+ * by the tracker will be accepted.
+ */
+ void accept(Tracker tracker, int flags);
+ void settle(Tracker tracker, int flags);
+
+ /**
+ * Gets the last known remote state of the delivery associated
+ * with the given tracker.
+ */
+ Status getStatus(Tracker tracker);
+
+ void route(String pattern, String address);
+
+ void rewrite(String pattern, String address);
+
+ /**
+ * Set the path to the certificate file.
+ */
+ void setCertificate(String certificate);
+
+ /**
+ * Get the path to the certificate file.
+ */
+ String getCertificate();
+
+ /**
+ * Set the path to private key file.
+ */
+ void setPrivateKey(String privateKey);
+
+ /**
+ * Get the path to the private key file.
+ */
+ String getPrivateKey();
+
+ /**
+ * Set the password for private key file.
+ */
+ void setPassword(String password);
+
+ /**
+ * Get the password for the priate key file.
+ */
+ String getPassword();
+
+ /**
+ * Set the path to the trusted certificate database.
+ */
+ void setTrustedCertificates(String trusted);
+
+ /**
+ * Get the path to the trusted certificate database.
+ */
+ String getTrustedCertificates();
+
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
index f0d7119..e6475b9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
@@ -45,7 +45,7 @@
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerException;
import org.apache.qpid.proton.messenger.Status;
@@ -461,7 +461,7 @@
StoreEntry entry = _incomingStore.get( null );
if (entry != null)
{
- Message message = Proton.message2();
+ Message message = Proton.message();
message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
_incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java
new file mode 100644
index 0000000..ef861bf
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl2.java
@@ -0,0 +1,1547 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.TimeoutException;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.message2.Message;
+import org.apache.qpid.proton.messenger.Messenger2;
+import org.apache.qpid.proton.messenger.MessengerException;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+
+public class MessengerImpl2 implements Messenger2
+{
+ private enum LinkCreditMode
+ {
+ // method for replenishing credit
+ LINK_CREDIT_EXPLICIT, // recv(N)
+ LINK_CREDIT_AUTO; // recv()
+ }
+
+ private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
+ private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
+ private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
+ private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
+
+ private final Logger _logger = Logger.getLogger("proton.messenger");
+ private final String _name;
+ private long _timeout = -1;
+ private boolean _blocking = true;
+ private long _nextTag = 1;
+ private Driver _driver;
+ private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+ private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
+ private int _credit; // available
+ private int _distributed; // outstanding credit
+ private int _receivers; // total # receiver Links
+ private int _draining; // # Links in drain state
+ private List<Receiver> _credited = new ArrayList<Receiver>();
+ private List<Receiver> _blocked = new ArrayList<Receiver>();
+ private long _next_drain;
+ private TrackerImpl _incomingTracker;
+ private TrackerImpl _outgoingTracker;
+ private Store _incomingStore = new Store();
+ private Store _outgoingStore = new Store();
+ private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
+ private int _sendThreshold;
+
+ private Transform _routes = new Transform();
+ private Transform _rewrites = new Transform();
+
+ private String _certificate;
+ private String _privateKey;
+ private String _password;
+ private String _trustedDb;
+
+
+ /**
+ * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
+ * Client code outside this module should use a {@link MessengerFactory} instead
+ */
+ @Deprecated public MessengerImpl2()
+ {
+ this(java.util.UUID.randomUUID().toString());
+ }
+
+ /**
+ * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
+ * Client code outside this module should use a {@link MessengerFactory} instead
+ */
+ @Deprecated public MessengerImpl2(String name)
+ {
+ _name = name;
+ }
+
+ public void setTimeout(long timeInMillis)
+ {
+ _timeout = timeInMillis;
+ }
+
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ public boolean isBlocking()
+ {
+ return _blocking;
+ }
+
+ public void setBlocking(boolean b)
+ {
+ _blocking = b;
+ }
+
+ public void setCertificate(String certificate)
+ {
+ _certificate = certificate;
+ }
+
+ public String getCertificate()
+ {
+ return _certificate;
+ }
+
+ public void setPrivateKey(String privateKey)
+ {
+ _privateKey = privateKey;
+ }
+
+ public String getPrivateKey()
+ {
+ return _privateKey;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setTrustedCertificates(String trusted)
+ {
+ _trustedDb = trusted;
+ }
+
+ public String getTrustedCertificates()
+ {
+ return _trustedDb;
+ }
+
+ public void start() throws IOException
+ {
+ _driver = Proton.driver();
+ }
+
+ public void stop()
+ {
+ if (_driver != null) {
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to stop");
+ }
+ //close all connections
+ for (Connector<?> c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ connection.close();
+ }
+ //stop listeners
+ for (Listener<?> l : _driver.listeners())
+ {
+ try
+ {
+ l.close();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.WARNING, "Error while closing listener", e);
+ }
+ }
+ waitUntil(_allClosed);
+ }
+ }
+
+ public boolean stopped()
+ {
+ return _allClosed.test();
+ }
+
+ public boolean work(long timeout) throws TimeoutException
+ {
+ if (_driver == null) { return false; }
+ _worked = false;
+ return waitUntil(_workPred, timeout);
+ }
+
+ public void interrupt()
+ {
+ if (_driver != null) {
+ _driver.wakeup();
+ }
+ }
+
+ private String defaultRewrite(String address) {
+ if (address != null && address.contains("@")) {
+ Address addr = new Address(address);
+ String scheme = addr.getScheme();
+ String host = addr.getHost();
+ String port = addr.getPort();
+ String name = addr.getName();
+
+ StringBuilder sb = new StringBuilder();
+ if (scheme != null) {
+ sb.append(scheme).append("://");
+ }
+ if (host != null) {
+ sb.append(host);
+ }
+ if (port != null) {
+ sb.append(":").append(port);
+ }
+ if (name != null) {
+ sb.append("/").append(name);
+ }
+ return sb.toString();
+ } else {
+ return address;
+ }
+ }
+
+
+ private String _original;
+
+ private void rewriteMessage(Message m)
+ {
+ _original = m.getAddress();
+ if (_rewrites.apply(_original)) {
+ m.setAddress(_rewrites.result());
+ } else {
+ m.setAddress(defaultRewrite(_original));
+ }
+ }
+
+ private void restoreMessage(Message m)
+ {
+ m.setAddress(_original);
+ }
+
+ private String routeAddress(String addr)
+ {
+ if (_routes.apply(addr)) {
+ return _routes.result();
+ } else {
+ return addr;
+ }
+ }
+
+ public void put(Message m) throws MessengerException
+ {
+ if (_driver == null) {
+ throw new IllegalStateException("cannot put while messenger is stopped");
+ }
+
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to put message: " + m);
+ }
+
+ StoreEntry entry = _outgoingStore.put( m.getAddress() );
+ _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
+ _outgoingStore.trackEntry(entry));
+
+ String routedAddress = routeAddress(m.getAddress());
+ Address address = new Address(routedAddress);
+ if (address.getHost() == null)
+ {
+ throw new MessengerException("unable to send to address: " + routedAddress);
+ }
+
+ rewriteMessage(m);
+
+ try {
+ adjustReplyTo(m);
+
+ int encoded;
+ byte[] buffer = new byte[5*1024];
+ while (true)
+ {
+ try
+ {
+ encoded = m.encode(buffer, 0, buffer.length);
+ break;
+ } catch (java.nio.BufferOverflowException e) {
+ buffer = new byte[buffer.length*2];
+ }
+ }
+ entry.setEncodedMsg( buffer, encoded );
+ }
+ finally
+ {
+ restoreMessage(m);
+ }
+
+ Sender sender = getLink(address, new SenderFinder(address.getName()));
+ pumpOut(m.getAddress(), sender);
+ }
+
+ private void reclaimLink(Link link)
+ {
+ if (link instanceof Receiver)
+ {
+ int credit = link.getCredit();
+ if (credit > 0)
+ {
+ _credit += credit;
+ _distributed -= credit;
+ }
+ }
+
+ Delivery delivery = link.head();
+ while (delivery != null)
+ {
+ StoreEntry entry = (StoreEntry) delivery.getContext();
+ if (entry != null)
+ {
+ entry.setDelivery(null);
+ if (delivery.isBuffered()) {
+ entry.setStatus(Status.ABORTED);
+ }
+ }
+ delivery = delivery.next();
+ }
+ linkRemoved(link);
+ }
+
+ private int pumpOut( String address, Sender sender )
+ {
+ StoreEntry entry = _outgoingStore.get( address );
+ if (entry == null) {
+ sender.drained();
+ return 0;
+ }
+
+ byte[] tag = String.valueOf(_nextTag++).getBytes();
+ Delivery delivery = sender.delivery(tag);
+ entry.setDelivery( delivery );
+ _logger.log(Level.FINE, "Sending on delivery: " + delivery);
+ int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
+ if (n < 0) {
+ _outgoingStore.freeEntry( entry );
+ _logger.log(Level.WARNING, "Send error: " + n);
+ return n;
+ } else {
+ sender.advance();
+ _outgoingStore.freeEntry( entry );
+ return 0;
+ }
+ }
+
+ public void send() throws TimeoutException
+ {
+ send(-1);
+ }
+
+ public void send(int n) throws TimeoutException
+ {
+ if (_driver == null) {
+ throw new IllegalStateException("cannot send while messenger is stopped");
+ }
+
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to send");
+ }
+
+ if (n == -1)
+ _sendThreshold = 0;
+ else
+ {
+ _sendThreshold = outgoing() - n;
+ if (_sendThreshold < 0)
+ _sendThreshold = 0;
+ }
+
+ waitUntil(_sentSettled);
+ }
+
+ public void recv(int n) throws TimeoutException
+ {
+ if (_driver == null) {
+ throw new IllegalStateException("cannot recv while messenger is stopped");
+ }
+
+ if (_logger.isLoggable(Level.FINE) && n != -1)
+ {
+ _logger.fine(this + " about to wait for up to " + n + " messages to be received");
+ }
+
+ if (n == -1)
+ {
+ _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
+ }
+ else
+ {
+ _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+ if (n > _distributed)
+ _credit = n - _distributed;
+ else // cancel unallocated
+ _credit = 0;
+ }
+
+ distributeCredit();
+
+ waitUntil(_messageAvailable);
+ }
+
+ public void recv() throws TimeoutException
+ {
+ recv(-1);
+ }
+
+ public int receiving()
+ {
+ return _credit + _distributed;
+ }
+
+ public Message get()
+ {
+ StoreEntry entry = _incomingStore.get( null );
+ if (entry != null)
+ {
+ Message message = Proton.message2();
+ message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
+
+ _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
+ _incomingStore.trackEntry(entry));
+
+ _incomingStore.freeEntry( entry );
+ return message;
+ }
+ return null;
+ }
+
+ private int pumpIn(String address, Receiver receiver)
+ {
+ Delivery delivery = receiver.current();
+ if (delivery.isReadable() && !delivery.isPartial())
+ {
+ StoreEntry entry = _incomingStore.put( address );
+ entry.setDelivery( delivery );
+
+ _logger.log(Level.FINE, "Readable delivery found: " + delivery);
+
+ int size = delivery.pending();
+ byte[] buffer = new byte[size];
+ int read = receiver.recv( buffer, 0, buffer.length );
+ if (read != size) {
+ throw new IllegalStateException();
+ }
+ entry.setEncodedMsg( buffer, size );
+ receiver.advance();
+
+ // account for the used credit, replenish if
+ // low (< 20% maximum per-link batch) and
+ // extra credit available
+ assert(_distributed > 0);
+ _distributed--;
+ if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
+ {
+ final int max = perLinkCredit();
+ final int lo_thresh = (int)(max * 0.2 + 0.5);
+ if (receiver.getRemoteCredit() < lo_thresh)
+ {
+ final int more = Math.min(_credit, max - receiver.getRemoteCredit());
+ _credit -= more;
+ _distributed += more;
+ receiver.flow(more);
+ }
+ }
+ // check if blocked
+ if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
+ {
+ _credited.remove(receiver);
+ if (receiver.getDrain())
+ {
+ receiver.setDrain(false);
+ assert( _draining > 0 );
+ _draining--;
+ }
+ _blocked.add(receiver);
+ }
+ }
+ return 0;
+ }
+
+ public void subscribe(String source) throws MessengerException
+ {
+ if (_driver == null) {
+ throw new IllegalStateException("messenger is stopped");
+ }
+
+ String routed = routeAddress(source);
+ Address address = new Address(routed);
+
+ String hostName = address.getHost();
+ if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
+ int port = Integer.valueOf(address.getImpliedPort());
+ if (address.isPassive())
+ {
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
+ }
+ ListenerContext ctx = new ListenerContext(address);
+ _driver.createListener(hostName, port, ctx);
+ }
+ else
+ {
+ if(_logger.isLoggable(Level.FINE))
+ {
+ _logger.fine(this + " about to subscribe to source " + source);
+ }
+ getLink(address, new ReceiverFinder(address.getName()));
+ }
+ }
+
+ public int outgoing()
+ {
+ return _outgoingStore.size() + queued(true);
+ }
+
+ public int incoming()
+ {
+ return _incomingStore.size() + queued(false);
+ }
+
+ public int getIncomingWindow()
+ {
+ return _incomingStore.getWindow();
+ }
+
+ public void setIncomingWindow(int window)
+ {
+ _incomingStore.setWindow(window);
+ }
+
+ public int getOutgoingWindow()
+ {
+ return _outgoingStore.getWindow();
+ }
+
+ public void setOutgoingWindow(int window)
+ {
+ _outgoingStore.setWindow(window);
+ }
+
+ public Tracker incomingTracker()
+ {
+ return _incomingTracker;
+ }
+ public Tracker outgoingTracker()
+ {
+ return _outgoingTracker;
+ }
+
+ private Store getTrackerStore(Tracker tracker)
+ {
+ return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
+ }
+
+ @Override
+ public void reject(Tracker tracker, int flags)
+ {
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
+ }
+
+ @Override
+ public void accept(Tracker tracker, int flags)
+ {
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
+ }
+
+ @Override
+ public void settle(Tracker tracker, int flags)
+ {
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
+ }
+
+ public Status getStatus(Tracker tracker)
+ {
+ int id = ((TrackerImpl)tracker).getSequence();
+ StoreEntry e = getTrackerStore(tracker).getEntry(id);
+ if (e != null)
+ {
+ return e.getStatus();
+ }
+ return Status.UNKNOWN;
+ }
+
+ @Override
+ public void route(String pattern, String address)
+ {
+ _routes.rule(pattern, address);
+ }
+
+ @Override
+ public void rewrite(String pattern, String address)
+ {
+ _rewrites.rule(pattern, address);
+ }
+
+ private int queued(boolean outgoing)
+ {
+ int count = 0;
+ if (_driver != null) {
+ for (Connector<?> c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ if (outgoing)
+ {
+ if (link instanceof Sender) count += link.getQueued();
+ }
+ else
+ {
+ if (link instanceof Receiver) count += link.getQueued();
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+ private void bringDestruction()
+ {
+ for (Connector<?> c : _awaitingDestruction)
+ {
+ c.destroy();
+ }
+ _awaitingDestruction.clear();
+ }
+
+ private void processAllConnectors()
+ {
+ distributeCredit();
+ for (Connector<?> c : _driver.connectors())
+ {
+ processEndpoints(c);
+ try
+ {
+ if (c.process()) {
+ _worked = true;
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ }
+ bringDestruction();
+ distributeCredit();
+ }
+
+ private void processActive()
+ {
+ //process active listeners
+ for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
+ {
+ _worked = true;
+ Connector<?> c = l.accept();
+ Connection connection = Proton.connection();
+ connection.setContainer(_name);
+ ListenerContext ctx = (ListenerContext) l.getContext();
+ connection.setContext(new ConnectionContext(ctx.getAddress(), c));
+ c.setConnection(connection);
+ Transport transport = c.getTransport();
+ //TODO: full SASL
+ Sasl sasl = c.sasl();
+ if (sasl != null)
+ {
+ sasl.server();
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ }
+ transport.ssl(ctx.getDomain());
+ connection.open();
+ }
+ // process connectors, reclaiming credit on closed connectors
+ for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
+ {
+ _worked = true;
+ if (c.isClosed())
+ {
+ _awaitingDestruction.add(c);
+ reclaimCredit(c.getConnection());
+ }
+ else
+ {
+ _logger.log(Level.FINE, "Processing active connector " + c);
+ try
+ {
+ c.process();
+ processEndpoints(c);
+ c.process();
+ }
+ catch (IOException e)
+ {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ }
+ }
+ bringDestruction();
+ distributeCredit();
+ }
+
+ private void processEndpoints(Connector c)
+ {
+ Connection connection = c.getConnection();
+
+ if (connection.getLocalState() == EndpointState.UNINITIALIZED)
+ {
+ connection.open();
+ }
+
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ Link link = delivery.getLink();
+ if (delivery.isUpdated())
+ {
+ if (link instanceof Sender)
+ {
+ delivery.disposition(delivery.getRemoteState());
+ }
+ StoreEntry e = (StoreEntry) delivery.getContext();
+ if (e != null) e.updated();
+ }
+
+ if (delivery.isReadable())
+ {
+ pumpIn( link.getSource().getAddress(), (Receiver)link );
+ }
+
+ Delivery next = delivery.getWorkNext();
+ delivery.clear();
+ delivery = next;
+ }
+
+ for (Session session : new Sessions(connection, UNINIT, ANY))
+ {
+ session.open();
+ _logger.log(Level.FINE, "Opened session " + session);
+ }
+ for (Link link : new Links(connection, UNINIT, ANY))
+ {
+ //TODO: the following is not correct; should only copy those properties that we understand
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ linkAdded(link);
+ link.open();
+ _logger.log(Level.FINE, "Opened link " + link);
+ }
+
+ distributeCredit();
+
+ for (Link link : new Links(connection, ACTIVE, ACTIVE))
+ {
+ if (link instanceof Sender)
+ {
+ pumpOut(link.getTarget().getAddress(), (Sender)link);
+ }
+ }
+
+ for (Session session : new Sessions(connection, ACTIVE, CLOSED))
+ {
+ session.close();
+ }
+
+ for (Link link : new Links(connection, ANY, CLOSED))
+ {
+ if (link.getLocalState() == EndpointState.ACTIVE)
+ {
+ link.close();
+ }
+ else
+ {
+ reclaimLink(link);
+ }
+ }
+
+ if (connection.getRemoteState() == EndpointState.CLOSED)
+ {
+ if (connection.getLocalState() == EndpointState.ACTIVE)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private boolean waitUntil(Predicate condition) throws TimeoutException
+ {
+ if (_blocking) {
+ boolean done = waitUntil(condition, _timeout);
+ if (!done) {
+ _logger.log(Level.SEVERE, String.format
+ ("Timeout when waiting for condition %s after %s ms",
+ condition, _timeout));
+ throw new TimeoutException();
+ }
+ return done;
+ } else {
+ return waitUntil(condition, 0);
+ }
+ }
+
+ private boolean waitUntil(Predicate condition, long timeout)
+ {
+ if (_driver == null) {
+ throw new IllegalStateException("cannot wait while messenger is stopped");
+ }
+
+ processAllConnectors();
+
+ // wait until timeout expires or until test is true
+ long now = System.currentTimeMillis();
+ final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
+ boolean done = false;
+
+ while (true)
+ {
+ done = condition.test();
+ if (done) break;
+
+ long remaining;
+ if (timeout < 0)
+ remaining = -1;
+ else {
+ remaining = deadline - now;
+ if (remaining < 0) break;
+ }
+
+ // Update the credit scheduler. If the scheduler detects
+ // credit imbalance on the links, wake up in time to
+ // service credit drain
+ distributeCredit();
+ if (_next_drain != 0)
+ {
+ long wakeup = (_next_drain > now) ? _next_drain - now : 0;
+ remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
+ }
+
+ boolean woken;
+ woken = _driver.doWait(remaining);
+ processActive();
+ if (woken) {
+ throw new InterruptException();
+ }
+ now = System.currentTimeMillis();
+ }
+
+ return done;
+ }
+
+ private Connection lookup(Address address)
+ {
+ for (Connector<?> c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ ConnectionContext ctx = (ConnectionContext) connection.getContext();
+ if (ctx.matches(address))
+ {
+ return connection;
+ }
+ }
+ return null;
+ }
+
+ private void reclaimCredit(Connection connection)
+ {
+ for (Link link : new Links(connection, ANY, ANY))
+ {
+ reclaimLink(link);
+ }
+ }
+
+ private void distributeCredit()
+ {
+ if (_receivers == 0) return;
+
+ if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
+ {
+ // replenish, but limit the max total messages buffered
+ final int max = _receivers * _credit_batch;
+ final int used = _distributed + incoming();
+ if (max > used)
+ _credit = max - used;
+ }
+
+ // reclaim any credit left over after draining links has completed
+ if (_draining > 0)
+ {
+ Iterator<Receiver> itr = _credited.iterator();
+ while (itr.hasNext())
+ {
+ Receiver link = (Receiver) itr.next();
+ if (link.getDrain())
+ {
+ if (!link.draining())
+ {
+ // drain completed for this link
+ int drained = link.drained();
+ assert(_distributed >= drained);
+ _distributed -= drained;
+ _credit += drained;
+ link.setDrain(false);
+ _draining--;
+ itr.remove();
+ _blocked.add(link);
+ }
+ }
+ }
+ }
+
+ // distribute available credit to blocked links
+ final int batch = perLinkCredit();
+ while (_credit > 0 && !_blocked.isEmpty())
+ {
+ Receiver link = _blocked.get(0);
+ _blocked.remove(0);
+
+ final int more = Math.min(_credit, batch);
+ _distributed += more;
+ _credit -= more;
+
+ link.flow(more);
+ _credited.add(link);
+
+ // flow changed, must process it
+ ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+ try
+ {
+ ctx.getConnector().process();
+ } catch (IOException e) {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ }
+
+ if (_blocked.isEmpty())
+ {
+ _next_drain = 0;
+ }
+ else
+ {
+ // not enough credit for all links - start draining granted credit
+ if (_draining == 0)
+ {
+ // don't do it too often - pace ourselves (it's expensive)
+ if (_next_drain == 0)
+ {
+ _next_drain = System.currentTimeMillis() + 250;
+ }
+ else if (_next_drain <= System.currentTimeMillis())
+ {
+ // initiate drain, free up at most enough to satisfy blocked
+ _next_drain = 0;
+ int needed = _blocked.size() * batch;
+
+ for (Receiver link : _credited)
+ {
+ if (!link.getDrain()) {
+ link.setDrain(true);
+ needed -= link.getRemoteCredit();
+ _draining++;
+ // drain requested on link, must process it
+ ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+ try
+ {
+ ctx.getConnector().process();
+ } catch (IOException e) {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ if (needed <= 0) break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private interface Predicate
+ {
+ boolean test();
+ }
+
+ private class SentSettled implements Predicate
+ {
+ public boolean test()
+ {
+ //are all sent messages settled?
+ int total = _outgoingStore.size();
+
+ for (Connector<?> c : _driver.connectors())
+ {
+ // TBD
+ // check if transport is done generating output
+ // pn_transport_t *transport = pn_connector_transport(ctor);
+ // if (transport) {
+ // if (!pn_transport_quiesced(transport)) {
+ // pn_connector_process(ctor);
+ // return false;
+ // }
+ // }
+
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ if (link instanceof Sender)
+ {
+ total += link.getQueued();
+ }
+ }
+
+ // TBD: there is no per-link unsettled
+ // deliveries iterator, so for now get the
+ // deliveries by walking the outgoing trackers
+ Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
+ while (entries.hasNext() && total <= _sendThreshold)
+ {
+ StoreEntry e = (StoreEntry) entries.next();
+ if (e != null )
+ {
+ Delivery d = e.getDelivery();
+ if (d != null)
+ {
+ if (d.getRemoteState() == null && !d.remotelySettled())
+ {
+ total++;
+ }
+ }
+ }
+ }
+ }
+ return total <= _sendThreshold;
+ }
+ }
+
+ private class MessageAvailable implements Predicate
+ {
+ public boolean test()
+ {
+ //do we have at least one pending message?
+ if (_incomingStore.size() > 0) return true;
+ for (Connector<?> c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ Delivery delivery = connection.getWorkHead();
+ while (delivery != null)
+ {
+ if (delivery.isReadable() && !delivery.isPartial())
+ {
+ return true;
+ }
+ else
+ {
+ delivery = delivery.getWorkNext();
+ }
+ }
+ }
+ // if no connections, or not listening, exit as there won't ever be a message
+ if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
+ return true;
+
+ return false;
+ }
+ }
+
+ private class AllClosed implements Predicate
+ {
+ public boolean test()
+ {
+ if (_driver == null) {
+ return true;
+ }
+
+ for (Connector<?> c : _driver.connectors()) {
+ if (!c.isClosed()) {
+ return false;
+ }
+ }
+
+ _driver.destroy();
+ _driver = null;
+
+ return true;
+ }
+ }
+
+ private boolean _worked = false;
+
+ private class WorkPred implements Predicate
+ {
+ public boolean test()
+ {
+ return _worked;
+ }
+ }
+
+ private final SentSettled _sentSettled = new SentSettled();
+ private final MessageAvailable _messageAvailable = new MessageAvailable();
+ private final AllClosed _allClosed = new AllClosed();
+ private final WorkPred _workPred = new WorkPred();
+
+ private interface LinkFinder<C extends Link>
+ {
+ C test(Link link);
+ C create(Session session);
+ }
+
+ private class SenderFinder implements LinkFinder<Sender>
+ {
+ private final String _path;
+
+ SenderFinder(String path)
+ {
+ _path = path == null ? "" : path;
+ }
+
+ public Sender test(Link link)
+ {
+ if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
+ {
+ return (Sender) link;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Sender create(Session session)
+ {
+ Sender sender = session.sender(_path);
+ Target target = new Target();
+ target.setAddress(_path);
+ sender.setTarget(target);
+ // the C implemenation does this:
+ Source source = new Source();
+ source.setAddress(_path);
+ sender.setSource(source);
+ if (getOutgoingWindow() > 0)
+ {
+ // use explicit settlement via dispositions (not pre-settled)
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
+ }
+ return sender;
+ }
+ }
+
+ private class ReceiverFinder implements LinkFinder<Receiver>
+ {
+ private final String _path;
+
+ ReceiverFinder(String path)
+ {
+ _path = path == null ? "" : path;
+ }
+
+ public Receiver test(Link link)
+ {
+ if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
+ {
+ return (Receiver) link;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public Receiver create(Session session)
+ {
+ Receiver receiver = session.receiver(_path);
+ Source source = new Source();
+ source.setAddress(_path);
+ receiver.setSource(source);
+ // the C implemenation does this:
+ Target target = new Target();
+ target.setAddress(_path);
+ receiver.setTarget(target);
+ if (getIncomingWindow() > 0)
+ {
+ // use explicit settlement via dispositions (not pre-settled)
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
+ receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+ }
+ return receiver;
+ }
+ }
+
+ private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
+ {
+ Connection connection = lookup(address);
+ if (connection == null)
+ {
+ String host = address.getHost();
+ int port = Integer.valueOf(address.getImpliedPort());
+ Connector<?> connector = _driver.createConnector(host, port, null);
+ _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
+ connection = Proton.connection();
+ connection.setContainer(_name);
+ connection.setHostname(host);
+ connection.setContext(new ConnectionContext(address, connector));
+ connector.setConnection(connection);
+ Sasl sasl = connector.sasl();
+ if (sasl != null)
+ {
+ sasl.client();
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ }
+ if ("amqps".equalsIgnoreCase(address.getScheme())) {
+ Transport transport = connector.getTransport();
+ SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
+ if (_trustedDb != null) {
+ domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
+ //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
+ } else {
+ domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
+ }
+ Ssl ssl = transport.ssl(domain);
+ //ssl.setPeerHostname(host);
+ }
+ connection.open();
+ }
+
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ C result = finder.test(link);
+ if (result != null) return result;
+ }
+ Session session = connection.session();
+ session.open();
+ C link = finder.create(session);
+ linkAdded(link);
+ link.open();
+ return link;
+ }
+
+ private static class Links implements Iterable<Link>
+ {
+ private final Connection _connection;
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+
+ Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+ {
+ _connection = connection;
+ _local = local;
+ _remote = remote;
+ }
+
+ public java.util.Iterator<Link> iterator()
+ {
+ return new LinkIterator(_connection, _local, _remote);
+ }
+ }
+
+ private static class LinkIterator implements java.util.Iterator<Link>
+ {
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+ private Link _next;
+
+ LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+ {
+ _local = local;
+ _remote = remote;
+ _next = connection.linkHead(_local, _remote);
+ }
+
+ public boolean hasNext()
+ {
+ return _next != null;
+ }
+
+ public Link next()
+ {
+ try
+ {
+ return _next;
+ }
+ finally
+ {
+ _next = _next.next(_local, _remote);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class Sessions implements Iterable<Session>
+ {
+ private final Connection _connection;
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+
+ Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+ {
+ _connection = connection;
+ _local = local;
+ _remote = remote;
+ }
+
+ public java.util.Iterator<Session> iterator()
+ {
+ return new SessionIterator(_connection, _local, _remote);
+ }
+ }
+
+ private static class SessionIterator implements java.util.Iterator<Session>
+ {
+ private final EnumSet<EndpointState> _local;
+ private final EnumSet<EndpointState> _remote;
+ private Session _next;
+
+ SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
+ {
+ _local = local;
+ _remote = remote;
+ _next = connection.sessionHead(_local, _remote);
+ }
+
+ public boolean hasNext()
+ {
+ return _next != null;
+ }
+
+ public Session next()
+ {
+ try
+ {
+ return _next;
+ }
+ finally
+ {
+ _next = _next.next(_local, _remote);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void adjustReplyTo(Message m)
+ {
+ String original = m.getReplyTo();
+ if (original != null) {
+ if (original.startsWith("~/"))
+ {
+ m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
+ }
+ else if (original.equals("~"))
+ {
+ m.setReplyTo("amqp://" + _name);
+ }
+ }
+ }
+
+ private static boolean matchTarget(Target target, String path)
+ {
+ if (target == null) return path.isEmpty();
+ else return path.equals(target.getAddress());
+ }
+
+ private static boolean matchSource(Source source, String path)
+ {
+ if (source == null) return path.isEmpty();
+ else return path.equals(source.getAddress());
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("MessengerImpl [_name=").append(_name).append("]");
+ return builder.toString();
+ }
+
+ // compute the maximum amount of credit each receiving link is
+ // entitled to. The actual credit given to the link depends on
+ // what amount of credit is actually available.
+ private int perLinkCredit()
+ {
+ if (_receivers == 0) return 0;
+ int total = _credit + _distributed;
+ return Math.max(total/_receivers, 1);
+ }
+
+ // a new link has been created, account for it.
+ private void linkAdded(Link link)
+ {
+ if (link instanceof Receiver)
+ {
+ _receivers++;
+ _blocked.add((Receiver)link);
+ link.setContext(Boolean.TRUE);
+ }
+ }
+
+ // a link is being removed, account for it.
+ private void linkRemoved(Link _link)
+ {
+ if (_link instanceof Receiver && (Boolean) _link.getContext())
+ {
+ _link.setContext(Boolean.FALSE);
+ Receiver link = (Receiver)_link;
+ assert _receivers > 0;
+ _receivers--;
+ if (link.getDrain())
+ {
+ link.setDrain(false);
+ assert _draining > 0;
+ _draining--;
+ }
+ if (_blocked.contains(link))
+ _blocked.remove(link);
+ else if (_credited.contains(link))
+ _credited.remove(link);
+ else
+ assert(false);
+ }
+ }
+
+ private class ConnectionContext
+ {
+ private Address _address;
+ private Connector _connector;
+
+ public ConnectionContext(Address address, Connector connector)
+ {
+ _address = address;
+ _connector = connector;
+ }
+
+ public Address getAddress()
+ {
+ return _address;
+ }
+
+ public boolean matches(Address address)
+ {
+ String host = address.getHost();
+ String port = address.getImpliedPort();
+ Connection conn = _connector.getConnection();
+ return host.equals(conn.getRemoteContainer()) ||
+ (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
+ }
+
+ public Connector getConnector()
+ {
+ return _connector;
+ }
+ }
+
+ private SslDomain makeDomain(Address address, SslDomain.Mode mode)
+ {
+ SslDomain domain = Proton.sslDomain();
+ domain.init(mode);
+ if (_certificate != null) {
+ domain.setCredentials(_certificate, _privateKey, _password);
+ }
+ if (_trustedDb != null) {
+ domain.setTrustedCaDb(_trustedDb);
+ }
+
+ if ("amqps".equalsIgnoreCase(address.getScheme())) {
+ domain.allowUnsecuredClient(false);
+ } else {
+ domain.allowUnsecuredClient(true);
+ }
+
+ return domain;
+ }
+
+
+ private class ListenerContext
+ {
+ private Address _address;
+ private SslDomain _domain;
+
+ public ListenerContext(Address address)
+ {
+ _address = address;
+ _domain = makeDomain(address, SslDomain.Mode.SERVER);
+ }
+
+ public SslDomain getDomain()
+ {
+ return _domain;
+ }
+
+ public Address getAddress()
+ {
+ return _address;
+ }
+
+ }
+}