PROTON-2490 Fix some issues with engine in SASL server mode
Fixes issue where an engine used as a SASL server does not properly
update the frame decoder state and interest mask. Fixes issues in the
test peer when testing the engine as a SASL server and some API naming
in peer SASL expectation types.
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index f667a70..5d57bc8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -288,6 +288,12 @@
}
try {
+ // When the outcome of SASL is read the decoder should revert to initial state
+ // as the only valid next incoming value is an AMQP header.
+ if (sasl instanceof SaslOutcome) {
+ frameParser.resetToExpectingHeader();
+ }
+
sasl.invoke(scriptEntry, frameSize, this);
} catch (UnexpectedPerformativeError e) {
if (scriptEntry.isOptional()) {
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
index 24ef11e..65f0bd0 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
@@ -43,12 +43,12 @@
return this;
}
- public SaslInitInjectAction withMechanism(byte[] response) {
+ public SaslInitInjectAction withInitialResponse(byte[] response) {
saslInit.setInitialResponse(new Binary(response));
return this;
}
- public SaslInitInjectAction withMechanism(Binary response) {
+ public SaslInitInjectAction withInitialResponse(Binary response) {
saslInit.setInitialResponse(response);
return this;
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
index bf85b25..36c81e8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
@@ -21,6 +21,7 @@
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome;
import org.apache.qpid.protonj2.test.driver.matchers.security.SaslOutcomeMatcher;
@@ -39,8 +40,12 @@
//----- Type specific with methods that perform simple equals checks
+ public SaslOutcomeExpectation withCode(byte code) {
+ return withCode(equalTo(UnsignedByte.valueOf(code)));
+ }
+
public SaslOutcomeExpectation withCode(SaslCode code) {
- return withCode(equalTo(code));
+ return withCode(equalTo(code.getValue()));
}
public SaslOutcomeExpectation withAdditionalData(byte[] additionalData) {
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
index 0c01e4d..11cf173 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
@@ -112,14 +112,12 @@
@Override
public void handleRead(EngineHandlerContext context, SASLEnvelope envelope) {
envelope.getBody().invoke(this, context);
- ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
context.fireRead(envelope);
}
@Override
public void handleWrite(EngineHandlerContext context, SASLEnvelope envelope) {
envelope.invoke(this, context);
- ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
context.fireWrite(envelope);
}
@@ -210,7 +208,6 @@
// Transition to parsing the frames if any pipelined into this buffer.
transitionToFrameSizeParsingStage();
- // This probably isn't right as this fires to next not current.
if (header.isSaslHeader()) {
decoder = CodecFactory.getSaslDecoder();
decoderState = decoder.newDecoderState();
@@ -218,6 +215,10 @@
} else {
decoder = CodecFactory.getDecoder();
decoderState = decoder.newDecoderState();
+ // Once we've read an AMQP header we no longer care if any SASL work
+ // occurs as that would be erroneous behavior which this handler doesn't
+ // deal with.
+ ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
context.fireRead(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
}
}
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
index 6c6d720..ae37f96 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
@@ -70,7 +70,7 @@
@Test
public void testReadValidHeaderInSingleByteChunks() throws Exception {
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A' }));
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'M' }));
@@ -82,18 +82,32 @@
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 0 }));
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
}
@Test
public void testReadValidHeaderInSplitChunks() throws Exception {
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A', 'M', 'Q', 'P' }));
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 0, 1, 0, 0 }));
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
+ Mockito.verifyNoMoreInteractions(context);
+ }
+
+ @Test
+ public void testReadOfSaslHeaderDoesNotDisableWritesMonitoring() throws Exception {
+ ProtonFrameDecodingHandler handler = createFrameDecoder();
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
+
+ handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A', 'M', 'Q', 'P' }));
+ handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 3, 1, 0, 0 }));
+
+ Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
Mockito.verifyNoMoreInteractions(context);
}
@@ -145,12 +159,13 @@
ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyOpen));
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verify(context).fireRead(argument.capture());
Mockito.verifyNoMoreInteractions(context);
@@ -183,9 +198,10 @@
ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(basicOpen));
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
@@ -226,11 +242,12 @@
ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(basicOpen));
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verify(context).fireRead(argument.capture());
Mockito.verifyNoMoreInteractions(context);
@@ -265,11 +282,12 @@
byte[] emptyFrame = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyFrame));
@@ -293,11 +311,12 @@
(byte) 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyFrames));
@@ -324,11 +343,12 @@
byte[] undersizedFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x07, 0x02, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
try {
@@ -352,11 +372,12 @@
byte[] underMinDoffFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x01, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
try {
@@ -380,11 +401,12 @@
byte[] overFrameSizeDoffFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x03, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
try {
@@ -406,11 +428,12 @@
byte[] overFrameSizeLimitFrameHeader = new byte[] { (byte) 0xA0, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00 };
ProtonFrameDecodingHandler handler = createFrameDecoder();
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+ Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
Mockito.verifyNoMoreInteractions(context);
try {
@@ -431,7 +454,7 @@
ProtonEngine engine = Mockito.mock(ProtonEngine.class);
Mockito.when(engine.configuration()).thenReturn(configuration);
Mockito.when(engine.isWritable()).thenReturn(Boolean.TRUE);
- EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+ ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
Mockito.when(context.engine()).thenReturn(engine);
ProtonFrameDecodingHandler handler = new ProtonFrameDecodingHandler();
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java
new file mode 100644
index 0000000..1af0bde
--- /dev/null
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl.sasl;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.engine.Engine;
+import org.apache.qpid.protonj2.engine.EngineFactory;
+import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
+import org.apache.qpid.protonj2.engine.impl.ProtonEngineTestSupport;
+import org.apache.qpid.protonj2.engine.sasl.SaslOutcome;
+import org.apache.qpid.protonj2.engine.sasl.SaslServerContext;
+import org.apache.qpid.protonj2.engine.sasl.SaslServerListener;
+import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
+import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
+import org.apache.qpid.protonj2.types.Symbol;
+import org.apache.qpid.protonj2.types.transport.AMQPHeader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test proton engine from the perspective of a SASL client
+ */
+@Timeout(20)
+public class ProtonSaslServerTest extends ProtonEngineTestSupport {
+
+ @Test
+ public void testEngineFailsIfAMQPHeaderArrivesWhenSASLHeaderExpected() throws Exception {
+ Engine engine = EngineFactory.PROTON.createEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ // Setup basic SASL server which only allows ANONYMOUS
+ engine.saslDriver().server().setListener(createAnonymousSaslServerListener());
+ engine.connection().openHandler((conn) -> conn.open());
+ engine.connection().closeHandler((conn) -> conn.close());
+ engine.start();
+
+ peer.expectSASLHeader();
+
+ try {
+ peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+ } catch (AssertionError pve) {
+ assertTrue(pve.getCause() instanceof ProtocolViolationException);
+ }
+
+ peer.waitForScriptToCompleteIgnoreErrors();
+
+ assertNotNull(failure);
+ }
+
+ @Test
+ public void testSaslAnonymousConnection() throws Exception {
+ Engine engine = EngineFactory.PROTON.createEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ // Setup basic SASL server which only allows ANONYMOUS
+ engine.saslDriver().server().setListener(createAnonymousSaslServerListener());
+ engine.connection().openHandler((conn) -> conn.open());
+ engine.connection().closeHandler((conn) -> conn.close());
+ engine.start();
+
+ peer.expectSASLHeader();
+ peer.expectSaslMechanisms().withSaslServerMechanisms("ANONYMOUS");
+ peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+ peer.waitForScriptToComplete();
+
+ peer.expectSaslOutcome().withCode(SaslCode.OK);
+ peer.remoteSaslInit().withMechanism("ANONYMOUS").now();
+ peer.waitForScriptToComplete();
+
+ peer.expectAMQPHeader();
+ peer.expectOpen();
+ peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+ peer.remoteOpen().now();
+ peer.waitForScriptToComplete();
+
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+
+ assertNull(failure);
+ }
+
+ @Test
+ public void testSaslPlainConnection() throws Exception {
+ Engine engine = EngineFactory.PROTON.createEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ // Setup basic SASL server which only allows ANONYMOUS
+ engine.saslDriver().server().setListener(createPlainSaslServerListener());
+ engine.connection().openHandler((conn) -> conn.open());
+ engine.connection().closeHandler((conn) -> conn.close());
+ engine.start();
+
+ peer.expectSASLHeader();
+ peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+ peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+ peer.waitForScriptToComplete();
+
+ peer.expectSaslOutcome().withCode(SaslCode.OK);
+ peer.remoteSaslInit().withMechanism("PLAIN")
+ .withInitialResponse(saslPlainInitialResponse("user", "pass")).now();
+ peer.waitForScriptToComplete();
+
+ peer.expectAMQPHeader();
+ peer.expectOpen();
+ peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+ peer.remoteOpen().now();
+ peer.waitForScriptToComplete();
+
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+
+ assertNull(failure);
+ }
+
+ @Test
+ public void testSaslPlainConnectionFailedWhenAnonymousOffered() throws Exception {
+ Engine engine = EngineFactory.PROTON.createEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ // Setup basic SASL server which only allows ANONYMOUS
+ engine.saslDriver().server().setListener(createPlainSaslServerListener());
+ engine.connection().openHandler((conn) -> conn.open());
+ engine.connection().closeHandler((conn) -> conn.close());
+ engine.start();
+
+ peer.expectSASLHeader();
+ peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+ peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+ peer.waitForScriptToComplete();
+
+ peer.expectSaslOutcome().withCode(SaslCode.SYS_PERM);
+ peer.remoteSaslInit().withMechanism("ANONYMOUS").now();
+ peer.waitForScriptToComplete();
+
+ assertNull(failure);
+ }
+
+ @Test
+ public void testEngineFailsForUnexpecetedNonSaslFrameDuringSaslExchange() throws Exception {
+ Engine engine = EngineFactory.PROTON.createEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ // Setup basic SASL server which only allows ANONYMOUS
+ engine.saslDriver().server().setListener(createPlainSaslServerListener());
+ engine.connection().openHandler((conn) -> conn.open());
+ engine.connection().closeHandler((conn) -> conn.close());
+ engine.start();
+
+ peer.expectSASLHeader();
+ peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+ peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+ peer.waitForScriptToComplete();
+
+ try {
+ peer.remoteOpen().now();
+ } catch (AssertionError pve) {
+ assertTrue(pve.getCause() instanceof ProtocolViolationException);
+ }
+
+ peer.waitForScriptToCompleteIgnoreErrors();
+
+ assertNotNull(failure);
+ }
+
+ public byte[] saslPlainInitialResponse(String username, String password) {
+ byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+ byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+ byte[] initialResponse = new byte[usernameBytes.length + passwordBytes.length + 2];
+ System.arraycopy(usernameBytes, 0, initialResponse, 1, usernameBytes.length);
+ System.arraycopy(passwordBytes, 0, initialResponse, 2 + usernameBytes.length, passwordBytes.length);
+
+ return initialResponse;
+ }
+
+ private SaslServerListener createAnonymousSaslServerListener() {
+ return new SaslServerListener() {
+
+ @Override
+ public void handleSaslResponse(SaslServerContext context, ProtonBuffer response) {
+ throw new RuntimeException("Not expecting any SASL Response frames");
+ }
+
+ @Override
+ public void handleSaslInit(SaslServerContext context, Symbol mechanism, ProtonBuffer initResponse) {
+ if (mechanism.equals(Symbol.valueOf("ANONYMOUS"))) {
+ context.sendOutcome(SaslOutcome.SASL_OK, null);
+ } else {
+ context.sendOutcome(SaslOutcome.SASL_PERM, null);
+ }
+ }
+
+ @Override
+ public void handleSaslHeader(SaslServerContext context, AMQPHeader header) {
+ context.sendMechanisms(new Symbol[] { Symbol.valueOf("ANONYMOUS") });
+ }
+ };
+ }
+
+ private SaslServerListener createPlainSaslServerListener() {
+ return new SaslServerListener() {
+
+ @Override
+ public void handleSaslResponse(SaslServerContext context, ProtonBuffer response) {
+ throw new RuntimeException("Not expecting any SASL Response frames");
+ }
+
+ @Override
+ public void handleSaslInit(SaslServerContext context, Symbol mechanism, ProtonBuffer initResponse) {
+ if (mechanism.equals(Symbol.valueOf("PLAIN"))) {
+ context.sendOutcome(SaslOutcome.SASL_OK, null);
+ } else {
+ context.sendOutcome(SaslOutcome.SASL_PERM, null);
+ }
+ }
+
+ @Override
+ public void handleSaslHeader(SaslServerContext context, AMQPHeader header) {
+ context.sendMechanisms(new Symbol[] { Symbol.valueOf("PLAIN") });
+ }
+ };
+ }
+}