NO-JIRA: Merged latest from trunk to this branch jni-binding with commands:
$ svn merge https://svn.apache.org/repos/asf/qpid/proton/trunk .
--- Merging r1441000 through r1441381 into '.':
U proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
U proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
U proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
C proton-j/proton/src/main/scripts
U proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
--- Recording mergeinfo for merge of r1421251 through r1441381 into '.':
U .
Summary of conflicts:
Tree conflicts: 1
$ svn merge -c 1441176 https://svn.apache.org/repos/asf/qpid/proton/trunk/proton-j/proton/src/main/scripts/proton.py ./proton-j/proton-api/src/main/resources/proton.py
--- Merging r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
U proton-j/proton-api/src/main/resources/proton.py
--- Recording mergeinfo for merge of r1441176 into 'proton-j/proton-api/src/main/resources/proton.py':
U proton-j/proton-api/src/main/resources/proton.py
To resolve conflicts, we had to add JNIDelivery.isPartial(), use MessengerFactory in Java proton.py, and catch ProtonUnsupportedOperation in Messenger test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/jni-binding@1441385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
index 87e3082..cb4d899 100644
--- a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
+++ b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
@@ -21,6 +21,7 @@
package org.apache.qpid.proton.engine.jni;
import org.apache.qpid.proton.ProtonCEquivalent;
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.jni.Proton;
@@ -222,6 +223,12 @@
}
@Override
+ public boolean isPartial()
+ {
+ throw new ProtonUnsupportedOperationException();
+ }
+
+ @Override
@ProtonCEquivalent("pn_delivery_settled")
public boolean isSettled()
{
diff --git a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
index 2b43d6f..09dad43 100644
--- a/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
+++ b/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessengerFactory.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.proton.messenger.jni;
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
import org.apache.qpid.proton.messenger.Messenger;
import org.apache.qpid.proton.messenger.MessengerFactory;
@@ -28,13 +29,13 @@
@Override
public Messenger createMessenger()
{
- throw new UnsupportedOperationException();
+ throw new ProtonUnsupportedOperationException();
}
@Override
public Messenger createMessenger(String name)
{
- throw new UnsupportedOperationException();
+ throw new ProtonUnsupportedOperationException();
}
}
diff --git a/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index deb5b14..8334198 100644
--- a/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -68,4 +68,6 @@
public Object getContext();
public boolean isUpdated();
+
+ public boolean isPartial();
}
diff --git a/proton-j/proton-api/src/main/resources/proton.py b/proton-j/proton-api/src/main/resources/proton.py
index 5e3c6bb..71987fa 100644
--- a/proton-j/proton-api/src/main/resources/proton.py
+++ b/proton-j/proton-api/src/main/resources/proton.py
@@ -25,11 +25,12 @@
EndpointState, TransportException
from org.apache.qpid.proton.message import \
MessageFormat, MessageFactory, Message as JMessage
-from org.apache.qpid.proton.messenger import MessengerException, Status
+from org.apache.qpid.proton.messenger import MessengerFactory, MessengerException, Status
from org.apache.qpid.proton.amqp.messaging import Source, Target, Accepted, AmqpValue
from org.apache.qpid.proton.amqp import UnsignedInteger
from jarray import zeros
from java.util import EnumSet, UUID as JUUID
+from java.util.concurrent import TimeoutException as Timeout
LANGUAGE = "Java"
@@ -55,6 +56,7 @@
protonFactoryLoader = ProtonFactoryLoader()
engineFactory = protonFactoryLoader.loadFactory(EngineFactory)
messageFactory = protonFactoryLoader.loadFactory(MessageFactory)
+messengerFactory = protonFactoryLoader.loadFactory(MessengerFactory)
class Endpoint(object):
@@ -511,15 +513,14 @@
def __init__(self, *args, **kwargs):
raise Skipped()
-class Timeout(Exception):
- pass
-
class Messenger(object):
def __init__(self, *args, **kwargs):
- #comment out or remove line below to enable messenger tests
- raise Skipped()
- self.impl = MessengerImpl()
+ try:
+ self.impl = messengerFactory.createMessenger()
+ except ProtonUnsupportedOperationException:
+ raise Skipped()
+
def start(self):
self.impl.start()
@@ -541,10 +542,9 @@
self.impl.recv(n)
def get(self, message=None):
- if message is None:
- self.impl.get()
- else:
- message.impl = self.impl.get()
+ result = self.impl.get()
+ if message and result:
+ message.impl = result
return self.impl.incomingTracker()
@property
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
index 0bac31a..972c4dd 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
@@ -55,7 +55,7 @@
private ByteBuffer _readBuffer = ByteBuffer.allocate(readBufferSize);
private ByteBuffer _writeBuffer = ByteBuffer.allocate(writeBufferSize);
- private boolean _readPending;
+ private boolean _readPending = true;
ConnectorImpl(DriverImpl driver, Listener<C> listener, SocketChannel c, C context, SelectionKey key)
{
@@ -81,33 +81,56 @@
_readPending = false;
if (isClosed()) return;
}
+ else
+ {
+ processInput();
+ }
write();
}
}
- void read() throws IOException
+ private void read() throws IOException
{
int bytesRead = 0;
while ((bytesRead = _channel.read(_readBuffer)) > 0)
{
- _readBuffer.flip();
- int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.limit());
- _readBuffer.position(consumed == Transport.END_OF_STREAM ? _readBuffer.limit() : consumed);
- if (_logger.isLoggable(Level.FINE))
- {
- _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining() + " available");
- }
- _readBuffer.compact();
+ processInput();
}
if (bytesRead == -1) {
close();
}
}
- void write() throws IOException
+ private int processInput() throws IOException
+ {
+ _readBuffer.flip();
+ int total = 0;
+ while (_readBuffer.hasRemaining())
+ {
+ int consumed = _transport.input(_readBuffer.array(), _readBuffer.position(), _readBuffer.remaining());
+ if (consumed == Transport.END_OF_STREAM)
+ {
+ continue;
+ }
+ else if (consumed == 0)
+ {
+ break;
+ }
+ _readBuffer.position(_readBuffer.position() + consumed);
+ if (_logger.isLoggable(Level.FINE))
+ {
+ _logger.log(Level.FINE, "consumed " + consumed + " bytes, " + _readBuffer.remaining() + " available");
+ }
+ total += consumed;
+ }
+ _readBuffer.compact();
+ return total;
+ }
+
+ private void write() throws IOException
{
int interest = _key.interestOps();
- int start = _writeBuffer.position();
+ boolean empty = _writeBuffer.position() == 0;
boolean done = false;
while (!done)
{
@@ -119,19 +142,20 @@
{
_logger.log(Level.FINE, "wrote " + wrote + " bytes, " + _writeBuffer.remaining() + " remaining");
}
- _writeBuffer.compact();
- if (_writeBuffer.position() > 0)
+ if (_writeBuffer.hasRemaining())
{
//weren't able to write all available data, ask to be notfied when we can write again
+ _writeBuffer.compact();
interest |= SelectionKey.OP_WRITE;
done = true;
}
else
{
- //we are done if buffer was empty to begin with and we did not produce enough to fill it
+ //we are done if buffer was empty to begin with and we did not produce anything
+ _writeBuffer.clear();
interest &= ~SelectionKey.OP_WRITE;
- done = start == 0 && produced < _writeBuffer.capacity();
- start = 0;
+ done = empty && produced == 0;
+ empty = true;
}
}
_key.interestOps(interest);
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 60ec3ab..76c8736 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -408,6 +408,11 @@
_complete = true;
}
+ public boolean isPartial()
+ {
+ return !_complete;
+ }
+
void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
{
_remoteDeliveryState = remoteDeliveryState;
diff --git a/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
index 8da4779..6815471 100644
--- a/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ b/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
@@ -134,7 +134,6 @@
try
{
c.process();
- c.close();
}
catch (IOException e)
{
@@ -153,6 +152,14 @@
_logger.log(Level.WARNING, "Error while closing listener", e);
}
}
+ try
+ {
+ waitUntil(_allClosed);
+ }
+ catch(TimeoutException e)
+ {
+ _logger.log(Level.WARNING, "Timed out while waiting for close", e);
+ }
_driver.destroy();
}
@@ -215,7 +222,7 @@
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
- if (delivery.isReadable())
+ if (delivery.isReadable() && !delivery.isPartial())
{
_logger.log(Level.FINE, "Readable delivery found: " + delivery);
int size = read((Receiver) delivery.getLink());
@@ -463,9 +470,16 @@
{
session.close();
}
- if (connection.getLocalState() == EndpointState.ACTIVE && connection.getRemoteState() == EndpointState.CLOSED)
+ if (connection.getRemoteState() == EndpointState.CLOSED)
{
- connection.close();
+ if (connection.getLocalState() == EndpointState.ACTIVE)
+ {
+ connection.close();
+ }
+ else if (connection.getLocalState() == EndpointState.CLOSED)
+ {
+ c.close();
+ }
}
if (c.isClosed())
@@ -501,7 +515,7 @@
boolean wait = deadline > System.currentTimeMillis();
boolean first = true;
- boolean done = condition.test();
+ boolean done = false;
while (first || (!done && wait))
{
@@ -513,6 +527,10 @@
done = done || condition.test();
first = false;
}
+ if (!done)
+ {
+ throw new TimeoutException();
+ }
}
private Connection lookup(String host, String service)
@@ -581,7 +599,7 @@
for (Connector c : _driver.connectors())
{
Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ACTIVE))
+ for (Link link : new Links(connection, ACTIVE, ANY))
{
if (link instanceof Sender)
{
@@ -642,9 +660,8 @@
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
- if (delivery.isReadable())
+ if (delivery.isReadable() && !delivery.isPartial())
{
- //TODO: check for partial delivery?
return true;
}
else
@@ -657,8 +674,19 @@
}
}
+ private class AllClosed implements Predicate
+ {
+ public boolean test()
+ {
+ if (_driver.connectors().iterator().hasNext()) return false;
+ else return true;
+ }
+
+ }
+
private final SentSettled _sentSettled = new SentSettled();
private final MessageAvailable _messageAvailable = new MessageAvailable();
+ private final AllClosed _allClosed = new AllClosed();
private interface LinkFinder<C extends Link>
{