NO-JIRA: Merged latest from trunk to this branch jni-binding with commands:

$ svn merge https://svn.apache.org/repos/asf/qpid/proton/trunk .                                                                                            
--- Merging r1441000 through r1441381 into '.':
U    proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
U    proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
U    proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
   C proton-j/proton/src/main/scripts
U    proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
--- Recording mergeinfo for merge of r1421251 through r1441381 into '.':
 U   .
Summary of conflicts:
  Tree conflicts: 1

$ svn merge -c 1441176 https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py ./proton-j/proton-api/src/main/resources/proton.py
--- Merging r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
U    proton-j/proton-api/src/main/resources/proton.py
--- Recording mergeinfo for merge of r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
 U   proton-j/proton-api/src/main/resources/proton.py


To resolve conflicts, we had to add JNIDelivery.isPartial(), use MessengerFactory in Java proton.py, and catch ProtonUnsupportedOperation in Messenger test.



git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/jni-binding@1441385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
index 87e3082..cb4d899 100644
--- a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
+++ b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.proton.engine.jni;
 
 import org.apache.qpid.proton.ProtonCEquivalent;
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.jni.Proton;
@@ -222,6 +223,12 @@
     }
 
     @Override
+    public boolean isPartial()
+    {
+        throw new ProtonUnsupportedOperationException();
+    }
+
+    @Override
     @ProtonCEquivalent("pn_delivery_settled")
     public boolean isSettled()
     {
diff --git a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
index 2b43d6f..09dad43 100644
--- a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
+++ b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
@@ -19,6 +19,7 @@
  */
 package org.apache.qpid.proton.messenger.jni;
 
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerFactory;
 
@@ -28,13 +29,13 @@
     @Override
     public Messenger createMessenger()
     {
-        throw new UnsupportedOperationException();
+        throw new ProtonUnsupportedOperationException();
     }
 
     @Override
     public Messenger createMessenger(String name)
     {
-        throw new UnsupportedOperationException();
+        throw new ProtonUnsupportedOperationException();
     }
 
 }
diff --git a/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index deb5b14..8334198 100644
--- a/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -68,4 +68,6 @@
     public Object getContext();
 
     public boolean isUpdated();
+
+    public boolean isPartial();
 }
diff --git a/proton-j/proton-api/src/main/resources/proton.py b/proton-j/proton-api/src/main/resources/proton.py
index 5e3c6bb..71987fa 100644
--- a/proton-j/proton-api/src/main/resources/proton.py
+++ b/proton-j/proton-api/src/main/resources/proton.py
@@ -25,11 +25,12 @@
     EndpointState, TransportException
 from org.apache.qpid.proton.message import \
     MessageFormat, MessageFactory, Message as JMessage
-from org.apache.qpid.proton.messenger import MessengerException, Status
+from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status
 from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue
 from org.apache.qpid.proton.amqp import UnsignedInteger
 from jarray import zeros
 from java.util import EnumSet, UUID as JUUID
+from java.util.concurrent import TimeoutException as Timeout
 
 LANGUAGE = "Java"
 
@@ -55,6 +56,7 @@
 protonFactoryLoader = ProtonFactoryLoader()
 engineFactory = protonFactoryLoader.loadFactory(EngineFactory)
 messageFactory = protonFactoryLoader.loadFactory(MessageFactory)
+messengerFactory = protonFactoryLoader.loadFactory(MessengerFactory)
 
 
 class Endpoint(object):
@@ -511,15 +513,14 @@
   def __init__(self, *args, **kwargs):
     raise Skipped()
 
-class Timeout(Exception):
-  pass
-
 class Messenger(object):
 
   def __init__(self, *args, **kwargs):
-    #comment out or remove line below to enable messenger tests
-    raise Skipped()
-    self.impl = MessengerImpl()
+    try:
+      self.impl = messengerFactory.createMessenger()
+    except ProtonUnsupportedOperationException:
+      raise Skipped()
+
 
   def start(self):
     self.impl.start()
@@ -541,10 +542,9 @@
     self.impl.recv(n)
 
   def get(self, message=None):
-    if message is None:
-      self.impl.get()
-    else:
-      message.impl = self.impl.get()
+    result = self.impl.get()
+    if message and result:
+      message.impl = result
     return self.impl.incomingTracker()
 
   @property
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
index 0bac31a..972c4dd 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
@@ -55,7 +55,7 @@
 
     private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
     private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
-    private boolean _readPending;
+    private boolean _readPending = true;
 
     ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C context, SelectionKey key)
     {
@@ -81,33 +81,56 @@
                 _readPending = false;
                 if (isClosed()) return;
             }
+            else
+            {
+                processInput();
+            }
             write();
         }
     }
 
-    void read() throws IOException
+    private void read() throws IOException
     {
         int bytesRead = 0;
         while ((bytesRead = _channel.read(_readBuffer)) > 0)
         {
-            _readBuffer.flip();
-            int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.limit());
-            _readBuffer.position(consumed == Transport.END_OF_STREAM ? _readBuffer.limit() : consumed);
-            if (_logger.isLoggable(Level.FINE))
-            {
-                _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining() + " available");
-            }
-            _readBuffer.compact();
+            processInput();
         }
         if (bytesRead == -1) {
             close();
         }
     }
 
-    void write() throws IOException
+    private int processInput() throws IOException
+    {
+        _readBuffer.flip();
+        int total = 0;
+        while (_readBuffer.hasRemaining())
+        {
+            int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.remaining());
+            if (consumed == Transport.END_OF_STREAM)
+            {
+                continue;
+            }
+            else if (consumed == 0)
+            {
+                break;
+            }
+            _readBuffer.position(_readBuffer.position() + consumed);
+            if (_logger.isLoggable(Level.FINE))
+            {
+                _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining() + " available");
+            }
+            total += consumed;
+        }
+        _readBuffer.compact();
+        return total;
+    }
+
+    private void write() throws IOException
     {
         int interest = _key.interestOps();
-        int start = _writeBuffer.position();
+        boolean empty = _writeBuffer.position() == 0;
         boolean done = false;
         while (!done)
         {
@@ -119,19 +142,20 @@
             {
                 _logger.log(Level.FINE, "wrote " + wrote + " bytes, " + _writeBuffer.remaining() + " remaining");
             }
-            _writeBuffer.compact();
-            if (_writeBuffer.position() > 0)
+            if (_writeBuffer.hasRemaining())
             {
                 //weren't able to write all available data, ask to be notfied when we can write again
+                _writeBuffer.compact();
                 interest |= SelectionKey.OP_WRITE;
                 done = true;
             }
             else
             {
-                //we are done if buffer was empty to begin with and we did not produce enough to fill it
+                //we are done if buffer was empty to begin with and we did not produce anything
+                _writeBuffer.clear();
                 interest &= ~SelectionKey.OP_WRITE;
-                done = start == 0 && produced < _writeBuffer.capacity();
-                start = 0;
+                done = empty && produced == 0;
+                empty = true;
             }
         }
         _key.interestOps(interest);
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 60ec3ab..76c8736 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -408,6 +408,11 @@
         _complete = true;
     }
 
+    public boolean isPartial()
+    {
+        return !_complete;
+    }
+
     void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
     {
         _remoteDeliveryState = remoteDeliveryState;
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
index 8da4779..6815471 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
@@ -134,7 +134,6 @@
             try
             {
                 c.process();
-                c.close();
             }
             catch (IOException e)
             {
@@ -153,6 +152,14 @@
                 _logger.log(Level.WARNING, "Error while closing listener", e);
             }
         }
+        try
+        {
+            waitUntil(_allClosed);
+        }
+        catch(TimeoutException e)
+        {
+            _logger.log(Level.WARNING, "Timed out while waiting for close", e);
+        }
         _driver.destroy();
     }
 
@@ -215,7 +222,7 @@
             Delivery delivery = connection.getWorkHead();
             while (delivery != null)
             {
-                if (delivery.isReadable())
+                if (delivery.isReadable() && !delivery.isPartial())
                 {
                     _logger.log(Level.FINE, "Readable delivery found: " + delivery);
                     int size = read((Receiver) delivery.getLink());
@@ -463,9 +470,16 @@
             {
                 session.close();
             }
-            if (connection.getLocalState() == EndpointState.ACTIVE && connection.getRemoteState() == EndpointState.CLOSED)
+            if (connection.getRemoteState() == EndpointState.CLOSED)
             {
-                connection.close();
+                if (connection.getLocalState() == EndpointState.ACTIVE)
+                {
+                    connection.close();
+                }
+                else if (connection.getLocalState() == EndpointState.CLOSED)
+                {
+                    c.close();
+                }
             }
 
             if (c.isClosed())
@@ -501,7 +515,7 @@
 
         boolean wait = deadline > System.currentTimeMillis();
         boolean first = true;
-        boolean done = condition.test();
+        boolean done = false;
 
         while (first || (!done && wait))
         {
@@ -513,6 +527,10 @@
             done = done || condition.test();
             first = false;
         }
+        if (!done)
+        {
+            throw new TimeoutException();
+        }
     }
 
     private Connection lookup(String host, String service)
@@ -581,7 +599,7 @@
             for (Connector c : _driver.connectors())
             {
                 Connection connection = c.getConnection();
-                for (Link link : new Links(connection, ACTIVE, ACTIVE))
+                for (Link link : new Links(connection, ACTIVE, ANY))
                 {
                     if (link instanceof Sender)
                     {
@@ -642,9 +660,8 @@
                 Delivery delivery = connection.getWorkHead();
                 while (delivery != null)
                 {
-                    if (delivery.isReadable())
+                    if (delivery.isReadable() && !delivery.isPartial())
                     {
-                        //TODO: check for partial delivery?
                         return true;
                     }
                     else
@@ -657,8 +674,19 @@
         }
     }
 
+    private class AllClosed implements Predicate
+    {
+        public boolean test()
+        {
+            if (_driver.connectors().iterator().hasNext()) return false;
+            else return true;
+        }
+
+    }
+
     private final SentSettled _sentSettled = new SentSettled();
     private final MessageAvailable _messageAvailable = new MessageAvailable();
+    private final AllClosed _allClosed = new AllClosed();
 
     private interface LinkFinder<C extends Link>
     {