PROTON-1486: Expose SaslOutcome additional-data to users of the API

Based on original work by rgodfrey <rgodfrey@apache.org>
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java
index 08929e8..e5ebabd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java
@@ -119,7 +119,7 @@
     int pending();
 
     /**
-     * Read challenge/response data sent from the peer.
+     * Read challenge/response/additional data sent from the peer.
      *
      * Use pending to determine the size of the data.
      *
@@ -131,7 +131,7 @@
     int recv(byte[] bytes, int offset, int size);
 
     /**
-     * Send challenge or response data to the peer.
+     * Send challenge/response/additional data to the peer.
      *
      * @param bytes The challenge/response data.
      * @param offset the point within the array at which the data starts at
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index daeb141..ffa49ff 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -159,7 +159,12 @@
                 org.apache.qpid.proton.amqp.security.SaslOutcome outcome =
                         new org.apache.qpid.proton.amqp.security.SaslOutcome();
                 outcome.setCode(SaslCode.values()[_outcome.getCode()]);
+                if (_outcome == PN_SASL_OK)
+                {
+                    outcome.setAdditionalData(getChallengeResponse());
+                }
                 writeFrame(outcome);
+                setChallengeResponse(null);
             }
         }
         else if(_role == Role.CLIENT)
@@ -394,6 +399,7 @@
         checkRole(Role.CLIENT);
         for(SaslOutcome outcome : SaslOutcome.values())
         {
+            setPending(saslOutcome.getAdditionalData()  == null ? null : saslOutcome.getAdditionalData().asByteBuffer());
             if(outcome.getCode() == saslOutcome.getCode().ordinal())
             {
                 _outcome = outcome;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SaslTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SaslTest.java
index 2980565..93718a0 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SaslTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SaslTest.java
@@ -24,15 +24,23 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import java.nio.charset.StandardCharsets;
 import java.util.logging.Logger;
 
+import org.junit.Test;
+
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.Sasl;
-import org.junit.Test;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
 
 public class SaslTest extends EngineTestBase
 {
     private static final Logger LOGGER = Logger.getLogger(SaslTest.class.getName());
+    private static final String TESTMECH1 = "TESTMECH1";
+    private static final String TESTMECH2 = "TESTMECH2";
+    private static final byte[] CHALLENGE_BYTES = "challenge-bytes".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] RESPONSE_BYTES = "response-bytes".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] ADDITIONAL_DATA_BYTES = "additional-data-bytes".getBytes(StandardCharsets.UTF_8);
 
     @Test
     public void testSaslHostnamePropagationAndRetrieval() throws Exception
@@ -97,4 +105,183 @@
         assertEquals(hostname, serverSasl.getHostname());
     }
 
+    /** 5.3.2 SASL Negotiation. */
+    @Test
+    public void testSaslNegotiation() throws Exception
+    {
+        getClient().transport = Proton.transport();
+        getServer().transport = Proton.transport();
+
+        Sasl clientSasl = getClient().transport.sasl();
+        clientSasl.client();
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+
+        Sasl serverSasl = getServer().transport.sasl();
+        serverSasl.server();
+        serverSasl.setMechanisms(TESTMECH1, TESTMECH2);
+        assertEquals("Server should not yet know the remote's chosen mechanism.",
+                     0,
+                     serverSasl.getRemoteMechanisms().length);
+
+        pumpClientToServer();
+        pumpServerToClient();
+
+        assertArrayEquals("Client should now know the server's mechanisms.",
+                          new String[]{TESTMECH1, TESTMECH2},
+                          clientSasl.getRemoteMechanisms());
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        clientSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+
+        assertArrayEquals("Server should now know the client's chosen mechanism.",
+                          new String[]{TESTMECH1},
+                          serverSasl.getRemoteMechanisms());
+
+        serverSasl.send(CHALLENGE_BYTES, 0, CHALLENGE_BYTES.length);
+
+        pumpServerToClient();
+
+        byte[] clientReceivedChallengeBytes = new byte[clientSasl.pending()];
+        clientSasl.recv(clientReceivedChallengeBytes, 0, clientReceivedChallengeBytes.length);
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        assertArrayEquals("Client should now know the server's challenge",
+                          CHALLENGE_BYTES,
+                          clientReceivedChallengeBytes);
+
+        clientSasl.send(RESPONSE_BYTES, 0, RESPONSE_BYTES.length);
+
+        pumpClientToServer();
+
+        byte[] serverReceivedResponseBytes = new byte[serverSasl.pending()];
+        serverSasl.recv(serverReceivedResponseBytes, 0, serverReceivedResponseBytes.length);
+
+        assertArrayEquals("Server should now know the client's response",
+                          RESPONSE_BYTES,
+                          serverReceivedResponseBytes);
+
+        serverSasl.done(SaslOutcome.PN_SASL_OK);
+        pumpServerToClient();
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_OK, clientSasl.getOutcome());
+    }
+
+    /** 5.3.2 SASL Negotiation. ...challenge/response step can occur zero or more times*/
+    @Test
+    public void testOptionalChallengeResponseStepOmitted() throws Exception
+    {
+        getClient().transport = Proton.transport();
+        getServer().transport = Proton.transport();
+
+        Sasl clientSasl = getClient().transport.sasl();
+        clientSasl.client();
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+
+        Sasl serverSasl = getServer().transport.sasl();
+        serverSasl.server();
+        serverSasl.setMechanisms(TESTMECH1);
+        assertEquals("Server should not yet know the remote's chosen mechanism.",
+                     0,
+                     serverSasl.getRemoteMechanisms().length);
+
+        pumpClientToServer();
+        pumpServerToClient();
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        clientSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+
+        serverSasl.done(SaslOutcome.PN_SASL_OK);
+        pumpServerToClient();
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_OK, clientSasl.getOutcome());
+    }
+
+    /**
+     *  5.3.3.5 The additional-data field carries additional data on successful authentication outcome as specified
+     *  by the SASL specification [RFC4422].
+     */
+    @Test
+    public void testOutcomeAdditionalData() throws Exception
+    {
+        getClient().transport = Proton.transport();
+        getServer().transport = Proton.transport();
+
+        Sasl clientSasl = getClient().transport.sasl();
+        clientSasl.client();
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+
+        Sasl serverSasl = getServer().transport.sasl();
+        serverSasl.server();
+        serverSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+        pumpServerToClient();
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        clientSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+
+        serverSasl.send(CHALLENGE_BYTES, 0, CHALLENGE_BYTES.length);
+
+        pumpServerToClient();
+
+        byte[] clientReceivedChallengeBytes = new byte[clientSasl.pending()];
+        clientSasl.recv(clientReceivedChallengeBytes, 0, clientReceivedChallengeBytes.length);
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        clientSasl.send(RESPONSE_BYTES, 0, RESPONSE_BYTES.length);
+
+        pumpClientToServer();
+
+        byte[] serverReceivedResponseBytes = new byte[serverSasl.pending()];
+        serverSasl.recv(serverReceivedResponseBytes, 0, serverReceivedResponseBytes.length);
+
+        serverSasl.send(ADDITIONAL_DATA_BYTES, 0, ADDITIONAL_DATA_BYTES.length);
+        serverSasl.done(SaslOutcome.PN_SASL_OK);
+        pumpServerToClient();
+
+        byte[] clientReceivedAdditionalDataBytes = new byte[clientSasl.pending()];
+        clientSasl.recv(clientReceivedAdditionalDataBytes, 0, clientReceivedAdditionalDataBytes.length);
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_OK, clientSasl.getOutcome());
+        assertArrayEquals("Client should now know the serrver's additional-data",
+                          ADDITIONAL_DATA_BYTES,
+                          clientReceivedAdditionalDataBytes);
+    }
+
+    /**
+     *  5.3.3.6 Connection authentication failed due to an unspecified problem with the supplied credentials.
+     */
+    @Test
+    public void testAuthenticationFails() throws Exception
+    {
+        getClient().transport = Proton.transport();
+        getServer().transport = Proton.transport();
+
+        Sasl clientSasl = getClient().transport.sasl();
+        clientSasl.client();
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+
+        Sasl serverSasl = getServer().transport.sasl();
+        serverSasl.server();
+        serverSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+        pumpServerToClient();
+
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_NONE, clientSasl.getOutcome());
+        clientSasl.setMechanisms(TESTMECH1);
+
+        pumpClientToServer();
+
+        serverSasl.done(SaslOutcome.PN_SASL_AUTH);
+        pumpServerToClient();
+        assertEquals("Unexpected SASL outcome at client", SaslOutcome.PN_SASL_AUTH, clientSasl.getOutcome());
+
+    }
+
 }