KNOX-2777 - Add configurations for concurrent session verifier feature (#608)

Co-authored-by:  MrtnBalazs <mbalazs@cloudera.com>
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index cf0d42b..3e45bac 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -293,6 +293,16 @@
   private static final String GATEWAY_DATABASE_VERIFY_SERVER_CERT =  GATEWAY_CONFIG_FILE_PREFIX + ".database.ssl.verify.server.cert";
   private static final String GATEWAY_DATABASE_TRUSTSTORE_FILE =  GATEWAY_CONFIG_FILE_PREFIX + ".database.ssl.truststore.file";
 
+  // Concurrent session properties
+  private static final String PRIVILEGED_USERS = "privileged.users";
+  private static final String NON_PRIVILEGED_USERS = "non." + PRIVILEGED_USERS;
+  private static final String GATEWAY_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT = GATEWAY_CONFIG_FILE_PREFIX + "." + PRIVILEGED_USERS + ".concurrent.session.limit";
+  private static final String GATEWAY_NON_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT = GATEWAY_CONFIG_FILE_PREFIX + "." + NON_PRIVILEGED_USERS + ".concurrent.session.limit";
+  private static final int GATEWAY_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT_DEFAULT = 3;
+  private static final int GATEWAY_NON_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT_DEFAULT = 2;
+  private static final String GATEWAY_PRIVILEGED_USERS = GATEWAY_CONFIG_FILE_PREFIX + "." + PRIVILEGED_USERS;
+  private static final String GATEWAY_NON_PRIVILEGED_USERS = GATEWAY_CONFIG_FILE_PREFIX + "." + NON_PRIVILEGED_USERS;
+
   public GatewayConfigImpl() {
     init();
   }
@@ -1335,4 +1345,25 @@
     return getInt(JETTY_MAX_FORM_KEYS, ContextHandler.DEFAULT_MAX_FORM_KEYS);
   }
 
+  @Override
+  public int getPrivilegedUsersConcurrentSessionLimit() {
+    return getInt(GATEWAY_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT, GATEWAY_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT_DEFAULT);
+  }
+
+  @Override
+  public int getNonPrivilegedUsersConcurrentSessionLimit() {
+    return getInt(GATEWAY_NON_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT, GATEWAY_NON_PRIVILEGED_USERS_CONCURRENT_SESSION_LIMIT_DEFAULT);
+  }
+
+  @Override
+  public Set<String> getPrivilegedUsers() {
+    final Collection<String> privilegedUsers = getTrimmedStringCollection(GATEWAY_PRIVILEGED_USERS);
+    return privilegedUsers == null ? Collections.emptySet() : new HashSet<>(privilegedUsers);
+  }
+
+  @Override
+  public Set<String> getNonPrivilegedUsers() {
+    final Collection<String> nonPrivilegedUsers = getTrimmedStringCollection(GATEWAY_NON_PRIVILEGED_USERS);
+    return nonPrivilegedUsers == null ? Collections.emptySet() : new HashSet<>(nonPrivilegedUsers);
+  }
 }
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java b/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
index a9c3be6..5ec699b 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/config/impl/GatewayConfigImplTest.java
@@ -24,6 +24,8 @@
 
 import java.nio.file.Paths;
 import java.security.KeyStore;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -424,4 +426,49 @@
     assertEquals("myTokenStateService", gatewayConfig.getServiceParameter("tokenstate", "impl"));
   }
 
+  @Test
+  public void testDefaultConcurrentSessionLimitParameters() {
+    GatewayConfigImpl config = new GatewayConfigImpl();
+
+    assertThat(config.getPrivilegedUsersConcurrentSessionLimit(), is(3));
+    assertThat(config.getNonPrivilegedUsersConcurrentSessionLimit(), is(2));
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>()));
+    assertThat(config.getNonPrivilegedUsers(), is(new HashSet<>()));
+  }
+
+  @Test
+  public void testNormalConcurrentSessionLimitParameters() {
+    GatewayConfigImpl config = new GatewayConfigImpl();
+
+    config.set("gateway.privileged.users.concurrent.session.limit", "5");
+    assertThat(config.getPrivilegedUsersConcurrentSessionLimit(), is(5));
+    config.set("gateway.non.privileged.users.concurrent.session.limit", "6");
+    assertThat(config.getNonPrivilegedUsersConcurrentSessionLimit(), is(6));
+    config.set("gateway.privileged.users", "admin,jeff");
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>(Arrays.asList("admin", "jeff"))));
+    config.set("gateway.non.privileged.users", "tom,sam");
+    assertThat(config.getNonPrivilegedUsers(), is(new HashSet<>(Arrays.asList("tom", "sam"))));
+  }
+
+  @Test
+  public void testAbnormalConcurrentSessionLimitParameters() {
+    GatewayConfigImpl config = new GatewayConfigImpl();
+
+    config.set("gateway.privileged.users", "");
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>()));
+    config.set("gateway.non.privileged.users", "");
+    config.set("gateway.privileged.users", "   ");
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>()));
+    config.set("gateway.non.privileged.users", "   ");
+    assertThat(config.getNonPrivilegedUsers(), is(new HashSet<>()));
+
+    config.set("gateway.privileged.users", " admin , jeff ");
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>(Arrays.asList("admin", "jeff"))));
+    config.set("gateway.non.privileged.users", " tom , sam ");
+    assertThat(config.getNonPrivilegedUsers(), is(new HashSet<>(Arrays.asList("tom", "sam"))));
+    config.set("gateway.privileged.users", "  guest  ");
+    assertThat(config.getPrivilegedUsers(), is(new HashSet<>(Arrays.asList("guest"))));
+    config.set("gateway.non.privileged.users", "  guest  ");
+    assertThat(config.getNonPrivilegedUsers(), is(new HashSet<>(Arrays.asList("guest"))));
+  }
 }
diff --git a/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java b/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index ec7367f..24d07b4 100644
--- a/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ b/gateway-spi-common/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -940,4 +940,24 @@
   public int getJettyMaxFormKeys() {
     return 0;
   }
+
+  @Override
+  public int getPrivilegedUsersConcurrentSessionLimit() {
+    return 0;
+  }
+
+  @Override
+  public int getNonPrivilegedUsersConcurrentSessionLimit() {
+    return 0;
+  }
+
+  @Override
+  public Set<String> getPrivilegedUsers() {
+    return null;
+  }
+
+  @Override
+  public Set<String> getNonPrivilegedUsers() {
+    return null;
+  }
 }
diff --git a/gateway-spi-common/src/main/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifier.java b/gateway-spi-common/src/main/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifier.java
new file mode 100644
index 0000000..e06633f
--- /dev/null
+++ b/gateway-spi-common/src/main/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifier.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.session.control;
+
+
+import org.apache.knox.gateway.config.GatewayConfig;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ConcurrentSessionVerifier {
+  public static final ConcurrentSessionVerifier INSTANCE = new ConcurrentSessionVerifier();
+  private Set<String> privilegedUsers;
+  private Set<String> nonPrivilegedUsers;
+  private int privilegedUserConcurrentSessionLimit;
+  private int nonPrivilegedUserConcurrentSessionLimit;
+  private Map<String, Integer> concurrentSessionCounter;
+  private final Lock sessionCountModifyLock = new ReentrantLock();
+
+  private ConcurrentSessionVerifier() {
+  }
+
+  public static ConcurrentSessionVerifier getInstance() {
+    return INSTANCE;
+  }
+
+  public void init(GatewayConfig config) {
+    this.privilegedUsers = config.getPrivilegedUsers();
+    this.nonPrivilegedUsers = config.getNonPrivilegedUsers();
+    this.privilegedUserConcurrentSessionLimit = config.getPrivilegedUsersConcurrentSessionLimit();
+    this.nonPrivilegedUserConcurrentSessionLimit = config.getNonPrivilegedUsersConcurrentSessionLimit();
+    this.concurrentSessionCounter = new ConcurrentHashMap<>();
+  }
+
+  public boolean verifySessionForUser(String username) {
+    if (!privilegedUsers.contains(username) && !nonPrivilegedUsers.contains(username)) {
+      return true;
+    }
+
+    sessionCountModifyLock.lock();
+    try {
+      concurrentSessionCounter.putIfAbsent(username, 0);
+      if (privilegedUserCheckLimitReached(username) || nonPrivilegedUserCheckLimitReached(username)) {
+        return false;
+      }
+      concurrentSessionCounter.compute(username, (key, value) -> value + 1);
+    } finally {
+      sessionCountModifyLock.unlock();
+    }
+    return true;
+  }
+
+  private boolean privilegedUserCheckLimitReached(String username) {
+    if (privilegedUserConcurrentSessionLimit < 0) {
+      return false;
+    }
+    return privilegedUsers.contains(username) && (concurrentSessionCounter.get(username) >= privilegedUserConcurrentSessionLimit);
+  }
+
+  private boolean nonPrivilegedUserCheckLimitReached(String username) {
+    if (nonPrivilegedUserConcurrentSessionLimit < 0) {
+      return false;
+    }
+    return nonPrivilegedUsers.contains(username) && (concurrentSessionCounter.get(username) >= nonPrivilegedUserConcurrentSessionLimit);
+  }
+
+  public void sessionEndedForUser(String username) {
+    sessionCountModifyLock.lock();
+    try {
+      concurrentSessionCounter.computeIfPresent(username, (key, counter) -> decreaseCounter(counter));
+    } finally {
+      sessionCountModifyLock.unlock();
+    }
+  }
+
+  private Integer decreaseCounter(Integer counter) {
+    counter--;
+    if (counter < 1) {
+      return null;
+    } else {
+      return counter;
+    }
+  }
+
+  Integer getUserConcurrentSessionCount(String username) {
+    return concurrentSessionCounter.get(username);
+  }
+}
diff --git a/gateway-spi-common/src/test/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifierTest.java b/gateway-spi-common/src/test/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifierTest.java
new file mode 100644
index 0000000..d70eca3
--- /dev/null
+++ b/gateway-spi-common/src/test/java/org/apache/knox/gateway/session/control/ConcurrentSessionVerifierTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.session.control;
+
+import org.apache.knox.gateway.config.GatewayConfig;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConcurrentSessionVerifierTest {
+
+  private ConcurrentSessionVerifier verifier;
+
+  @Before
+  public void setUp() {
+    verifier = ConcurrentSessionVerifier.getInstance();
+  }
+
+  private GatewayConfig mockConfig(Set<String> privilegedUsers, Set<String> nonPrivilegedUsers, int privilegedUsersLimit, int nonPrivilegedUsersLimit) {
+    GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+    EasyMock.expect(config.getPrivilegedUsers()).andReturn(privilegedUsers);
+    EasyMock.expect(config.getNonPrivilegedUsers()).andReturn(nonPrivilegedUsers);
+    EasyMock.expect(config.getPrivilegedUsersConcurrentSessionLimit()).andReturn(privilegedUsersLimit);
+    EasyMock.expect(config.getNonPrivilegedUsersConcurrentSessionLimit()).andReturn(nonPrivilegedUsersLimit);
+    EasyMock.replay(config);
+    return config;
+  }
+
+
+  @Test
+  public void userIsInNeitherOfTheGroups() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 2);
+    verifier.init(config);
+    for (int i = 0; i < 4; i++) {
+      Assert.assertTrue(verifier.verifySessionForUser("sam"));
+    }
+  }
+
+  @Test
+  public void userIsInBothOfTheGroups() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin", "tom")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 2);
+    verifier.init(config);
+
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+
+    config = mockConfig(new HashSet<>(Arrays.asList("admin", "tom")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 4);
+    verifier.init(config);
+
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+  }
+
+  @Test
+  public void userIsPrivileged() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 2);
+    verifier.init(config);
+
+    Assert.assertTrue(verifier.verifySessionForUser("admin"));
+    Assert.assertTrue(verifier.verifySessionForUser("admin"));
+    Assert.assertTrue(verifier.verifySessionForUser("admin"));
+    Assert.assertFalse(verifier.verifySessionForUser("admin"));
+    Assert.assertFalse(verifier.verifySessionForUser("admin"));
+    verifier.sessionEndedForUser("admin");
+    Assert.assertTrue(verifier.verifySessionForUser("admin"));
+    Assert.assertFalse(verifier.verifySessionForUser("admin"));
+    Assert.assertFalse(verifier.verifySessionForUser("admin"));
+  }
+
+  @Test
+  public void userIsNotPrivileged() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 2);
+    verifier.init(config);
+
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+    verifier.sessionEndedForUser("tom");
+    Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+  }
+
+  @Test
+  public void privilegedLimitIsZero() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 0, 2);
+    verifier.init(config);
+
+    Assert.assertFalse(verifier.verifySessionForUser("admin"));
+  }
+
+  @Test
+  public void nonPrivilegedLimitIsZero() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 3, 0);
+    verifier.init(config);
+
+    Assert.assertFalse(verifier.verifySessionForUser("tom"));
+  }
+
+  @Test
+  public void sessionsDoNotGoToNegative() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), 2, 2);
+    verifier.init(config);
+
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("admin"));
+    verifier.verifySessionForUser("admin");
+    Assert.assertEquals(1, verifier.getUserConcurrentSessionCount("admin").intValue());
+    verifier.sessionEndedForUser("admin");
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("admin"));
+    verifier.sessionEndedForUser("admin");
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("admin"));
+    verifier.verifySessionForUser("admin");
+    Assert.assertEquals(1, verifier.getUserConcurrentSessionCount("admin").intValue());
+
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("tom"));
+    verifier.verifySessionForUser("tom");
+    Assert.assertEquals(1, verifier.getUserConcurrentSessionCount("tom").intValue());
+    verifier.sessionEndedForUser("tom");
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("tom"));
+    verifier.sessionEndedForUser("tom");
+    Assert.assertNull(verifier.getUserConcurrentSessionCount("tom"));
+    verifier.verifySessionForUser("tom");
+    Assert.assertEquals(1, verifier.getUserConcurrentSessionCount("tom").intValue());
+  }
+
+  @Test
+  public void negativeLimitMeansUnlimited() {
+    GatewayConfig config = mockConfig(new HashSet<>(Arrays.asList("admin")), new HashSet<>(Arrays.asList("tom", "guest")), -2, -2);
+    verifier.init(config);
+
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(verifier.verifySessionForUser("admin"));
+      Assert.assertTrue(verifier.verifySessionForUser("tom"));
+    }
+  }
+}
+
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index faa5ea3..bf6eee3 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -813,4 +813,12 @@
   int getJettyMaxFormContentSize();
 
   int getJettyMaxFormKeys();
+
+  int getPrivilegedUsersConcurrentSessionLimit();
+
+  int getNonPrivilegedUsersConcurrentSessionLimit();
+
+  Set<String> getPrivilegedUsers();
+
+  Set<String> getNonPrivilegedUsers();
 }