QPID-8482:[Broker-J] Introduce derived attribute for a session peer name in AMQP 0-10 sessions
This closes #71
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 227c48d..ef5f8ce 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -22,14 +22,19 @@
import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;
+import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.security.Principal;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +57,8 @@
public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+ static final String MESSAGE_DIGEST_SHA1 = "SHA-1";
+ static final int BASE64_LIMIT = 64;
private final AmqpPort<?> _port;
private List<Object> _locales;
@@ -410,7 +417,7 @@
final ServerSession serverSession =
new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
- serverSession);
+ serverSession, getPeerSessionName(atc.getName()));
session.create();
serverSession.setModelObject(session);
@@ -427,6 +434,49 @@
}
}
+ private String getPeerSessionName(final byte[] attachName)
+ {
+ try
+ {
+ return UUID.fromString(new String(attachName, StandardCharsets.UTF_8)).toString();
+ }
+ catch (RuntimeException e)
+ {
+ return createBase64OrSha1(attachName);
+ }
+ }
+
+ private String createBase64OrSha1(final byte[] attachName)
+ {
+ if (attachName.length <= BASE64_LIMIT)
+ {
+ return Base64.getEncoder().encodeToString(attachName);
+ }
+ else
+ {
+ return createSha1(attachName);
+ }
+ }
+
+ private String createSha1(final byte[] attachName)
+ {
+ try
+ {
+ final MessageDigest digest = MessageDigest.getInstance(MESSAGE_DIGEST_SHA1);
+ return Base64.getEncoder().encodeToString(digest.digest(attachName));
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ return Base64.getEncoder().encodeToString(attachName);
+ }
+ }
+
+ // for test purposes only
+ void setState(ConnectionState state)
+ {
+ _state = state;
+ }
+
private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
{
final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
new file mode 100644
index 0000000..4347f6b
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.model.DerivedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+
+@ManagedObject(category = false, creatable = false, type="Session_0_10")
+public interface Session<C extends Session<C>> extends org.apache.qpid.server.session.AMQPSession<C, ConsumerTarget_0_10>
+{
+ @DerivedAttribute
+ String getPeerSessionName();
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index dd6ca80..d8197be 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -33,14 +33,18 @@
import org.apache.qpid.server.util.Action;
public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
- implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10>
+ implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10> ,Session<Session_0_10>
{
private final AMQPConnection_0_10 _connection;
private final ServerSession _serverSession;
+ private String _peerSessionName;
- protected Session_0_10(final Connection<?> parent, final int sessionId, final ServerSession serverSession)
+ protected Session_0_10(final Connection<?> parent,
+ final int sessionId,
+ final ServerSession serverSession, final String peerSessionName)
{
super(parent, sessionId);
+ _peerSessionName = peerSessionName;
_connection = (AMQPConnection_0_10) parent;
_serverSession = serverSession;
}
@@ -159,4 +163,10 @@
{
return _serverSession;
}
+
+ @Override
+ public String getPeerSessionName()
+ {
+ return _peerSessionName;
+ }
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java
new file mode 100644
index 0000000..fbcfe3c
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.BASE64_LIMIT;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.MESSAGE_DIGEST_SHA1;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.MessageDigest;
+import java.security.PrivilegedAction;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.security.auth.Subject;
+import javax.security.auth.SubjectDomainCombiner;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class ServerConnectionDelegateTest extends UnitTestBase
+{
+
+ private ServerConnectionDelegate _delegate;
+ private ServerConnection _serverConnection;
+ private TaskExecutor _taskExecutor;
+ private AccessControlContext _accessControlContext;
+
+ @Before
+ public void setUp()
+ {
+ _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+ final Broker broker = mock(Broker.class);
+ when(broker.getNetworkBufferSize()).thenReturn(0xffff);
+ when(broker.getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)).thenReturn(Long.MAX_VALUE);
+ when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
+ when(broker.getModel()).thenReturn(BrokerModel.getInstance());
+ final AuthenticationProvider authenticationProvider = mock(AuthenticationProvider.class);
+ when(authenticationProvider.getAvailableMechanisms(anyBoolean())).thenReturn(Collections.singletonList("PLAIN"));
+ final AmqpPort<?> port = mock(AmqpPort.class);
+ when(port.getAuthenticationProvider()).thenReturn(authenticationProvider);
+ when(port.getParent()).thenReturn(broker);
+
+ _delegate = new ServerConnectionDelegate(port, true, "test");
+ _delegate.setState(ServerConnectionDelegate.ConnectionState.OPEN);
+ final NamedAddressSpace addressSpace = mock(NamedAddressSpace.class);
+ when(addressSpace.getConnections()).thenReturn(Collections.emptyList());
+
+ final Subject subject = new Subject();
+ subject.setReadOnly();
+ _accessControlContext = AccessController.getContext();
+ final AMQPConnection_0_10 amqpConnection = mock(AMQPConnection_0_10.class);
+ when(amqpConnection.getParent()).thenReturn(broker);
+ when(amqpConnection.getBroker()).thenReturn(broker);
+ when(amqpConnection.getChildExecutor()).thenReturn(_taskExecutor);
+ when(amqpConnection.getModel()).thenReturn(BrokerModel.getInstance());
+ when(amqpConnection.getSubject()).thenReturn(subject);
+ when(amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Long.MAX_VALUE);
+ when(amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Integer.MAX_VALUE);
+ doAnswer((Answer<AccessControlContext>) invocationOnMock -> {
+ Subject subject1 = (Subject) invocationOnMock.getArgument(0);
+ return AccessController.doPrivileged(
+ (PrivilegedAction<AccessControlContext>) () ->
+ new AccessControlContext(_accessControlContext, new SubjectDomainCombiner(subject1)));
+ }).when(amqpConnection).getAccessControlContextFromSubject(any());
+ when(amqpConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
+
+ _serverConnection = mock(ServerConnection.class);
+ when(_serverConnection.getAddressSpace()).thenReturn(addressSpace);
+ when(_serverConnection.getBroker()).thenReturn(broker);
+ when(_serverConnection.getAmqpConnection()).thenReturn(amqpConnection);
+ }
+
+ @After
+ public void tearDown()
+ {
+ _taskExecutor.stop();
+ }
+
+ @Test
+ public void sessionAttachWhenNameIsUUID()
+ {
+ final String name = UUID.randomUUID().toString();
+ final SessionAttach attach = createSessionAttach(name);
+
+ _delegate.sessionAttach(_serverConnection, attach);
+
+ final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+ verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+ final ServerSession serverSession = sessionCaptor.getValue();
+ final Session session = serverSession.getModelObject();
+ assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(name)));
+ }
+
+ @Test
+ public void sessionAttachWhenNameIsNotUUID()
+ {
+ final String name = "ABC";
+ final SessionAttach attach = createSessionAttach(name);
+
+ _delegate.sessionAttach(_serverConnection, attach);
+
+ final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+ verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+ final ServerSession serverSession = sessionCaptor.getValue();
+ final Session session = serverSession.getModelObject();
+ assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(Base64.getEncoder().encodeToString(name.getBytes(UTF_8)))));
+ }
+
+ @Test
+ public void sessionAttachWhenNameExceedsSizeLimit() throws Exception
+ {
+ final String name = Stream.generate(() -> String.valueOf('a')).limit(BASE64_LIMIT + 1).collect(Collectors.joining());;
+ final SessionAttach attach = createSessionAttach(name);
+
+ _delegate.sessionAttach(_serverConnection, attach);
+
+ final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+ verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+ final ServerSession serverSession = sessionCaptor.getValue();
+ final Session session = serverSession.getModelObject();
+ final String digest = Base64.getEncoder().encodeToString(MessageDigest.getInstance(MESSAGE_DIGEST_SHA1).digest(name.getBytes(UTF_8)));
+ assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(digest)));
+ }
+
+ private SessionAttach createSessionAttach(final String name)
+ {
+ final SessionAttach attach = new SessionAttach();
+ attach.setName(name.getBytes(UTF_8));
+ return attach;
+ }
+}
\ No newline at end of file
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index f7e0259..c32bf4e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -145,7 +145,7 @@
invokedMethods.add(m);
}
};
- Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session);
+ Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session, getTestName());
session.setModelObject(modelSession);
ServerSessionDelegate delegate = new ServerSessionDelegate();