blob: 0c0fcf1439afa9273f6e4f6501f23fd6da00da0c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.geode.security;
import static org.apache.geode.cache.query.dunit.SecurityTestUtils.createAndExecuteCQ;
import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_LOG_LEVEL;
import static org.apache.geode.internal.lang.SystemProperty.DEFAULT_PREFIX;
import static org.apache.geode.internal.lang.SystemPropertyHelper.RE_AUTHENTICATE_WAIT_TIME;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.dunit.SecurityTestUtils.EventsCqListner;
import org.apache.geode.cache.query.dunit.SecurityTestUtils.KeysCacheListener;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.junit.categories.SecurityTest;
import org.apache.geode.test.junit.rules.ServerStarterRule;
@Category({SecurityTest.class})
public class AuthExpirationDistributedTest {
@Rule
public ClusterStartupRule cluster = new ClusterStartupRule();
@Rule
public RestoreSystemProperties restore = new RestoreSystemProperties();
@Rule
public ServerStarterRule server = new ServerStarterRule()
.withSecurityManager(ExpirableSecurityManager.class)
.withProperty(SECURITY_LOG_LEVEL, "debug")
.withSystemProperty(DEFAULT_PREFIX + RE_AUTHENTICATE_WAIT_TIME, "5000")
.withRegion(RegionShortcut.REPLICATE, "region");
private ClientVM clientVM;
@Before
public void before() throws Exception {
// this is enabled to show how many times authorize call is made with each permission key
getSecurityManager().setAllowDuplicate(true);
}
@After
public void after() {
if (clientVM != null) {
clientVM.invoke(UpdatableUserAuthInitialize::reset);
}
getSecurityManager().close();
}
private static EventsCqListner CQLISTENER0;
@Test
public void cqClientWillReAuthenticateAutomatically() throws Exception {
startClientWithCQ();
Region<Object, Object> region = server.getCache().getRegion("/region");
region.put("1", "value1");
clientVM.invoke(() -> {
await().untilAsserted(
() -> assertThat(CQLISTENER0.getKeys())
.asList()
.containsExactly("1"));
});
// expire the current user
getSecurityManager().addExpiredUser("user1");
// update the user to be used before we try to send the 2nd event
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user2");
});
// do a second put, the event should be queued until client re-authenticate
region.put("2", "value2");
clientVM.invoke(() -> {
// the client will eventually get the 2nd event
await().untilAsserted(
() -> assertThat(CQLISTENER0.getKeys())
.asList()
.containsExactly("1", "2"));
});
Map<String, List<String>> authorizedOps = getSecurityManager().getAuthorizedOps();
assertThat(authorizedOps.keySet().size()).isEqualTo(2);
assertThat(authorizedOps.get("user1")).asList().containsExactly("DATA:READ:region",
"DATA:READ:region", "DATA:READ:region:1");
assertThat(authorizedOps.get("user2")).asList().containsExactly("DATA:READ:region:2");
Map<String, List<String>> unAuthorizedOps = getSecurityManager().getUnAuthorizedOps();
assertThat(unAuthorizedOps.keySet().size()).isEqualTo(1);
assertThat(unAuthorizedOps.get("user1")).asList().containsExactly("DATA:READ:region:2");
}
@Test
public void registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyDefault()
throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withPoolSubscription(true)
.withProperty(SECURITY_LOG_LEVEL, "debug")
.withServerConnection(serverPort));
ClientVM client2 = cluster.startClientVM(1,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withPoolSubscription(true)
.withServerConnection(serverPort));
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
Region<Object, Object> region = ClusterStartupRule.getClientCache()
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
// this test will succeed because when clients re-connects, it will re-register interest
// a new queue will be created with all the data. Old queue is destroyed.
region.registerInterestForAllKeys();
UpdatableUserAuthInitialize.setUser("user11");
// wait for time longer than server's max time to wait to re-authenticate
UpdatableUserAuthInitialize.setWaitTime(6000);
});
AsyncInvocation<Void> invokePut = client2.invokeAsync(() -> {
UpdatableUserAuthInitialize.setUser("user2");
Region<Object, Object> region = ClusterStartupRule.getClientCache()
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
});
getSecurityManager().addExpiredUser("user1");
invokePut.await();
// make sure this client recovers and get all the events and will be able to do client operation
clientVM.invoke(() -> {
Region<Object, Object> region = ClusterStartupRule.getClientCache().getRegion("region");
await().untilAsserted(
() -> assertThat(region.keySet()).hasSize(100));
region.put("key100", "value100");
});
// user1 should not be used to put any keys to the region
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region");
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region:key0");
}
@Test
public void registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_durableClient()
throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withProperty(DURABLE_CLIENT_ID, "123456")
.withPoolSubscription(true)
.withServerConnection(serverPort));
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
ClientCache clientCache = ClusterStartupRule.getClientCache();
Region<Object, Object> region = clientCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
region.registerInterestForAllKeys(InterestResultPolicy.NONE, true);
clientCache.readyForEvents();
UpdatableUserAuthInitialize.setUser("user11");
// wait for time longer than server's max time to wait to re-authenticate
UpdatableUserAuthInitialize.setWaitTime(6000);
});
getSecurityManager().addExpiredUser("user1");
Region<Object, Object> region = server.getCache().getRegion("/region");
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
// make sure this client recovers and get all the events and will be able to do client operation
clientVM.invoke(() -> {
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
await().untilAsserted(() -> assertThat(clientRegion).hasSize(100));
clientRegion.put("key100", "value100");
});
// user1 should not be used to put any keys to the region
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region");
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region:key0");
}
private static KeysCacheListener myListener = new KeysCacheListener();
@Test
public void registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_CacheListener_durableClient()
throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withProperty(DURABLE_CLIENT_ID, "123456")
.withPoolSubscription(true)
.withServerConnection(serverPort));
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
myListener = new KeysCacheListener();
ClientCache clientCache = ClusterStartupRule.getClientCache();
Region<Object, Object> region = clientCache
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.addCacheListener(myListener).create("region");
// use NONE policy to make sure the old messages still sticks around
region.registerInterestForAllKeys(InterestResultPolicy.NONE, true);
clientCache.readyForEvents();
UpdatableUserAuthInitialize.setUser("user11");
// wait for time longer than server's max time to wait to re-authenticate
UpdatableUserAuthInitialize.setWaitTime(6000);
});
getSecurityManager().addExpiredUser("user1");
Region<Object, Object> region = server.getCache().getRegion("/region");
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
// make sure this client recovers and get all the events and will be able to do client operation
clientVM.invoke(() -> {
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
await().untilAsserted(() -> assertThat(myListener.keys).hasSize(100));
clientRegion.put("key100", "value100");
});
// user1 should not be used to put any keys to the region
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region");
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region:key0");
}
@Test
public void registeredInterestForKeysAndValidatedTheyWereAllReceived_slowReAuth_policyNone_nonDurableClient()
throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withPoolSubscription(true)
.withServerConnection(serverPort));
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
ClientCache clientCache = ClusterStartupRule.getClientCache();
Region<Object, Object> region = clientCache
.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create("region");
// use InterestResultPolicy.NONE to make sure the old queue is still around
region.registerInterestForAllKeys(InterestResultPolicy.NONE);
UpdatableUserAuthInitialize.setUser("user11");
// wait for time longer than server's max time to wait to ree-authenticate
UpdatableUserAuthInitialize.setWaitTime(6000);
});
getSecurityManager().addExpiredUser("user1");
Region<Object, Object> region = server.getCache().getRegion("/region");
IntStream.range(0, 100).forEach(i -> region.put("key" + i, "value" + i));
// client will recover but there will be message loss
clientVM.invoke(() -> {
Region<Object, Object> clientRegion = ClusterStartupRule.getClientCache().getRegion("region");
await().during(10, TimeUnit.SECONDS).untilAsserted(
() -> assertThat(clientRegion.keySet()).hasSizeLessThan(100));
clientRegion.put("key100", "value100");
});
// user1 should not be used to put any keys to the region
assertThat(getSecurityManager().getAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region");
assertThat(getSecurityManager().getAuthorizedOps().get("user11"))
.contains("DATA:WRITE:region:key100");
assertThat(getSecurityManager().getUnAuthorizedOps().get("user1"))
.containsExactly("DATA:READ:region:key0");
}
private void startClientWithCQ() throws Exception {
int serverPort = server.getPort();
clientVM = cluster.startClientVM(0,
c -> c.withProperty(SECURITY_CLIENT_AUTH_INIT, UpdatableUserAuthInitialize.class.getName())
.withCacheSetup(
ccf -> ccf.setPoolSubscriptionRedundancy(2).setPoolSubscriptionEnabled(true))
.withServerConnection(serverPort));
clientVM.invoke(() -> {
UpdatableUserAuthInitialize.setUser("user1");
CQLISTENER0 = createAndExecuteCQ(ClusterStartupRule.getClientCache().getQueryService(), "CQ1",
"select * from /region");
});
}
private ExpirableSecurityManager getSecurityManager() {
return (ExpirableSecurityManager) server.getCache().getSecurityService().getSecurityManager();
}
}