QPID-8482:[Broker-J] Introduce derived attribute for a session peer name in AMQP 0-10 sessions

This closes #71
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 227c48d..ef5f8ce 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -22,14 +22,19 @@
 
 import static org.apache.qpid.server.protocol.v0_10.ServerConnection.State.CLOSE_RCVD;
 
+import java.nio.charset.StandardCharsets;
 import java.security.AccessControlException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +57,8 @@
 public class ServerConnectionDelegate extends MethodDelegate<ServerConnection> implements ProtocolDelegate<ServerConnection>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+    static final String MESSAGE_DIGEST_SHA1 = "SHA-1";
+    static final int BASE64_LIMIT = 64;
     private final AmqpPort<?> _port;
 
     private List<Object> _locales;
@@ -410,7 +417,7 @@
             final ServerSession serverSession =
                     new ServerSession(serverConnection, serverSessionDelegate, new Binary(atc.getName()), 0);
             final Session_0_10 session = new Session_0_10(serverConnection.getAmqpConnection(), atc.getChannel(),
-                                                          serverSession);
+                                                          serverSession, getPeerSessionName(atc.getName()));
             session.create();
             serverSession.setModelObject(session);
 
@@ -427,6 +434,49 @@
         }
     }
 
+    private String getPeerSessionName(final byte[] attachName)
+    {
+        try
+        {
+            return UUID.fromString(new String(attachName, StandardCharsets.UTF_8)).toString();
+        }
+        catch (RuntimeException e)
+        {
+            return createBase64OrSha1(attachName);
+        }
+    }
+
+    private String createBase64OrSha1(final byte[] attachName)
+    {
+        if (attachName.length <= BASE64_LIMIT)
+        {
+            return Base64.getEncoder().encodeToString(attachName);
+        }
+        else
+        {
+            return createSha1(attachName);
+        }
+    }
+
+    private String createSha1(final byte[] attachName)
+    {
+        try
+        {
+            final MessageDigest digest = MessageDigest.getInstance(MESSAGE_DIGEST_SHA1);
+            return Base64.getEncoder().encodeToString(digest.digest(attachName));
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            return Base64.getEncoder().encodeToString(attachName);
+        }
+    }
+
+    // for test purposes only
+    void setState(ConnectionState state)
+    {
+        _state = state;
+    }
+
     private boolean isSessionNameUnique(final byte[] name, final ServerConnection conn)
     {
         final Principal authorizedPrincipal = conn.getAuthorizedPrincipal();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
new file mode 100644
index 0000000..4347f6b
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.model.DerivedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+
+@ManagedObject(category = false, creatable = false, type="Session_0_10")
+public interface Session<C extends Session<C>> extends org.apache.qpid.server.session.AMQPSession<C, ConsumerTarget_0_10>
+{
+    @DerivedAttribute
+    String getPeerSessionName();
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index dd6ca80..d8197be 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -33,14 +33,18 @@
 import org.apache.qpid.server.util.Action;
 
 public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarget_0_10>
-        implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10>
+        implements LogSubject, org.apache.qpid.server.util.Deletable<Session_0_10> ,Session<Session_0_10>
 {
     private final AMQPConnection_0_10 _connection;
     private final ServerSession _serverSession;
+    private String _peerSessionName;
 
-    protected Session_0_10(final Connection<?> parent, final int sessionId, final ServerSession serverSession)
+    protected Session_0_10(final Connection<?> parent,
+                           final int sessionId,
+                           final ServerSession serverSession, final String peerSessionName)
     {
         super(parent, sessionId);
+        _peerSessionName = peerSessionName;
         _connection = (AMQPConnection_0_10) parent;
         _serverSession = serverSession;
     }
@@ -159,4 +163,10 @@
     {
         return _serverSession;
     }
+
+    @Override
+    public String getPeerSessionName()
+    {
+        return _peerSessionName;
+    }
 }
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java
new file mode 100644
index 0000000..fbcfe3c
--- /dev/null
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegateTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.BASE64_LIMIT;
+import static org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate.MESSAGE_DIGEST_SHA1;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.MessageDigest;
+import java.security.PrivilegedAction;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.security.auth.Subject;
+import javax.security.auth.SubjectDomainCombiner;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class ServerConnectionDelegateTest extends UnitTestBase
+{
+
+    private ServerConnectionDelegate _delegate;
+    private ServerConnection _serverConnection;
+    private TaskExecutor _taskExecutor;
+    private AccessControlContext _accessControlContext;
+
+    @Before
+    public void setUp()
+    {
+        _taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
+        final Broker broker = mock(Broker.class);
+        when(broker.getNetworkBufferSize()).thenReturn(0xffff);
+        when(broker.getContextValue(Long.class, Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)).thenReturn(Long.MAX_VALUE);
+        when(broker.getTaskExecutor()).thenReturn(_taskExecutor);
+        when(broker.getModel()).thenReturn(BrokerModel.getInstance());
+        final AuthenticationProvider authenticationProvider = mock(AuthenticationProvider.class);
+        when(authenticationProvider.getAvailableMechanisms(anyBoolean())).thenReturn(Collections.singletonList("PLAIN"));
+        final AmqpPort<?> port = mock(AmqpPort.class);
+        when(port.getAuthenticationProvider()).thenReturn(authenticationProvider);
+        when(port.getParent()).thenReturn(broker);
+
+        _delegate = new ServerConnectionDelegate(port, true, "test");
+        _delegate.setState(ServerConnectionDelegate.ConnectionState.OPEN);
+        final NamedAddressSpace addressSpace = mock(NamedAddressSpace.class);
+        when(addressSpace.getConnections()).thenReturn(Collections.emptyList());
+
+        final Subject subject = new Subject();
+        subject.setReadOnly();
+         _accessControlContext = AccessController.getContext();
+        final AMQPConnection_0_10 amqpConnection = mock(AMQPConnection_0_10.class);
+        when(amqpConnection.getParent()).thenReturn(broker);
+        when(amqpConnection.getBroker()).thenReturn(broker);
+        when(amqpConnection.getChildExecutor()).thenReturn(_taskExecutor);
+        when(amqpConnection.getModel()).thenReturn(BrokerModel.getInstance());
+        when(amqpConnection.getSubject()).thenReturn(subject);
+        when(amqpConnection.getContextValue(Long.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Long.MAX_VALUE);
+        when(amqpConnection.getContextValue(Integer.class, org.apache.qpid.server.model.Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Integer.MAX_VALUE);
+        doAnswer((Answer<AccessControlContext>) invocationOnMock -> {
+            Subject subject1 = (Subject) invocationOnMock.getArgument(0);
+            return AccessController.doPrivileged(
+                    (PrivilegedAction<AccessControlContext>) () ->
+                            new AccessControlContext(_accessControlContext, new SubjectDomainCombiner(subject1)));
+        }).when(amqpConnection).getAccessControlContextFromSubject(any());
+        when(amqpConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
+
+        _serverConnection = mock(ServerConnection.class);
+        when(_serverConnection.getAddressSpace()).thenReturn(addressSpace);
+        when(_serverConnection.getBroker()).thenReturn(broker);
+        when(_serverConnection.getAmqpConnection()).thenReturn(amqpConnection);
+    }
+
+    @After
+    public void tearDown()
+    {
+        _taskExecutor.stop();
+    }
+
+    @Test
+    public void sessionAttachWhenNameIsUUID()
+    {
+        final String name = UUID.randomUUID().toString();
+        final SessionAttach attach = createSessionAttach(name);
+
+        _delegate.sessionAttach(_serverConnection, attach);
+
+        final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+        verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+        final ServerSession serverSession = sessionCaptor.getValue();
+        final Session session = serverSession.getModelObject();
+        assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(name)));
+    }
+
+    @Test
+    public void sessionAttachWhenNameIsNotUUID()
+    {
+        final String name = "ABC";
+        final SessionAttach attach = createSessionAttach(name);
+
+        _delegate.sessionAttach(_serverConnection, attach);
+
+        final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+        verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+        final ServerSession serverSession = sessionCaptor.getValue();
+        final Session session = serverSession.getModelObject();
+        assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(Base64.getEncoder().encodeToString(name.getBytes(UTF_8)))));
+    }
+
+    @Test
+    public void sessionAttachWhenNameExceedsSizeLimit() throws Exception
+    {
+        final String name = Stream.generate(() -> String.valueOf('a')).limit(BASE64_LIMIT + 1).collect(Collectors.joining());;
+        final SessionAttach attach = createSessionAttach(name);
+
+        _delegate.sessionAttach(_serverConnection, attach);
+
+        final ArgumentCaptor<ServerSession> sessionCaptor = ArgumentCaptor.forClass(ServerSession.class);
+        verify(_serverConnection).registerSession(sessionCaptor.capture());
+
+        final ServerSession serverSession = sessionCaptor.getValue();
+        final Session session = serverSession.getModelObject();
+        final String digest = Base64.getEncoder().encodeToString(MessageDigest.getInstance(MESSAGE_DIGEST_SHA1).digest(name.getBytes(UTF_8)));
+        assertThat(session.getPeerSessionName(), CoreMatchers.is(equalTo(digest)));
+    }
+
+    private SessionAttach createSessionAttach(final String name)
+    {
+        final SessionAttach attach = new SessionAttach();
+        attach.setName(name.getBytes(UTF_8));
+        return attach;
+    }
+}
\ No newline at end of file
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index f7e0259..c32bf4e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -145,7 +145,7 @@
                 invokedMethods.add(m);
             }
         };
-        Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session);
+        Session_0_10 modelSession = new Session_0_10(modelConnection, 1, session, getTestName());
         session.setModelObject(modelSession);
         ServerSessionDelegate delegate = new ServerSessionDelegate();