blob: 759a50c775e7dd52e342cbd8eedf6bb7d636d060 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.security;
import static org.apache.geode.security.SecurityTestUtils.NO_EXCEPTION;
import static org.apache.geode.security.SecurityTestUtils.closeCache;
import static org.apache.geode.security.SecurityTestUtils.concatProperties;
import static org.apache.geode.security.SecurityTestUtils.createCacheClientForMultiUserMode;
import static org.apache.geode.security.SecurityTestUtils.createProxyCache;
import static org.apache.geode.security.SecurityTestUtils.getProxyCaches;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.operations.OperationContext.OperationCode;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
import org.apache.geode.cache.query.cq.internal.ClientCQImpl;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.security.generator.AuthzCredentialGenerator;
import org.apache.geode.security.generator.CredentialGenerator;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.junit.categories.SecurityTest;
@Category({SecurityTest.class})
public class MultiUserDurableCQAuthzDUnitTest extends ClientAuthorizationTestCase {
private final Map<String, String> cqNameToQueryStrings = new HashMap<>();
@Override
public final void preSetUpClientAuthorizationTestBase() throws Exception {
getSystem();
invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
}
@Override
public final void postSetUpClientAuthorizationTestBase() throws Exception {
cqNameToQueryStrings.put("CQ_0", "SELECT * FROM ");
cqNameToQueryStrings.put("CQ_1", "SELECT * FROM ");
}
@Override
public final void postTearDownClientAuthorizationTestBase() throws Exception {
cqNameToQueryStrings.clear();
}
@Test
public void testCQForDurableClientsWithDefaultClose() throws Exception {
/*
* 1. Start a server. 2. Start a durable client in mulituser secure mode. 3. Create two users
* registering unique durable CQs on server. 4. Invoke GemFireCache.close() at client. 5. Put
* some events on server satisfying both the CQs. 6. Up the client and the two users. 7. Confirm
* that the users receive the events which were enqueued at server while they were away. 8. Same
* for ProxyCache.close()
*/
int numOfUsers = 2;
int numOfPuts = 5;
boolean[] postAuthzAllowed = new boolean[] {true, true};
doTest(numOfUsers, numOfPuts, postAuthzAllowed, getXmlAuthzGenerator(), null);
}
@Test
public void testCQForDurableClientsWithCloseKeepAliveTrue() throws Exception {
/*
* 1. Start a server. 2. Start a durable client in mulituser secure mode. 3. Create two users
* registering unique durable CQs on server. 4. Invoke GemFireCache.close(false) at client. 5.
* Put some events on server satisfying both the CQs. 6. Up the client and the two users. 7.
* Observer the behaviour. 8. Same for ProxyCache.close(false)
*/
int numOfUsers = 2;
int numOfPuts = 5;
boolean[] postAuthzAllowed = new boolean[] {true, true};
doTest(numOfUsers, numOfPuts, postAuthzAllowed, getXmlAuthzGenerator(), true);
}
@Test
public void testCQForDurableClientsWithCloseKeepAliveFalse() throws Exception {
/*
* 1. Start a server. 2. Start a durable client in mulituser secure mode. 3. Create two users
* registering unique durable CQs on server. 4. Invoke GemFireCache.close(true) at client. 5.
* Put some events on server satisfying both the CQs. 6. Up the client and the two users. 7.
* Observer the behaviour. 8. Same for ProxyCache.close(true)
*/
int numOfUsers = 2;
int numOfPuts = 5;
boolean[] postAuthzAllowed = new boolean[] {true, true};
doTest(numOfUsers, numOfPuts, postAuthzAllowed, getXmlAuthzGenerator(), false);
}
/**
* WARNING: "final Boolean keepAlive" is treated as a ternary value: null, true, false
*/
private void doTest(int numOfUsers, int numOfPuts, boolean[] postAuthzAllowed,
final AuthzCredentialGenerator authzGenerator, final Boolean keepAlive) throws Exception {
CredentialGenerator credentialGenerator = authzGenerator.getCredentialGenerator();
Properties extraAuthProps = credentialGenerator.getSystemProperties();
Properties javaProps = credentialGenerator.getJavaProperties();
Properties extraAuthzProps = authzGenerator.getSystemProperties();
String authenticator = credentialGenerator.getAuthenticator();
String accessor = authzGenerator.getAuthorizationCallback();
String authInit = credentialGenerator.getAuthInit();
TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(authzGenerator);
Properties serverProps =
buildProperties(authenticator, accessor, true, extraAuthProps, extraAuthzProps);
Properties opCredentials;
credentialGenerator = tgen.getCredentialGenerator();
final Properties javaProps2 =
credentialGenerator != null ? credentialGenerator.getJavaProperties() : null;
int[] indices = new int[numOfPuts];
for (int index = 0; index < numOfPuts; ++index) {
indices[index] = index;
}
Random random = new Random();
Properties[] authProps = new Properties[numOfUsers];
String durableClientId = "multiuser_durable_client_1";
Properties client2Credentials = null;
for (int i = 0; i < numOfUsers; i++) {
int rand = random.nextInt(100) + 1;
if (postAuthzAllowed[i]) {
opCredentials = tgen.getAllowedCredentials(
new OperationCode[] {OperationCode.EXECUTE_CQ, OperationCode.GET}, // For callback, GET
// should be allowed
new String[] {regionName}, indices, rand);
} else {
opCredentials = tgen.getDisallowedCredentials(new OperationCode[] {OperationCode.GET}, // For
// callback,
// GET
// should
// be
// disallowed
new String[] {regionName}, indices, rand);
}
authProps[i] =
concatProperties(new Properties[] {opCredentials, extraAuthProps, extraAuthzProps});
if (client2Credentials == null) {
client2Credentials = tgen.getAllowedCredentials(new OperationCode[] {OperationCode.PUT},
new String[] {regionName}, indices, rand);
}
}
// Get ports for the servers
int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
int port1 = randomAvailableTCPPorts[0];
int port2 = randomAvailableTCPPorts[1];
// Close down any running servers
server1.invoke(() -> closeCache());
server2.invoke(() -> closeCache());
server1.invoke(() -> createServerCache(serverProps, javaProps, port1));
client1.invoke(() -> createClientCache(javaProps2, authInit, authProps,
new int[] {port1, port2}, numOfUsers, durableClientId, postAuthzAllowed));
client1.invoke(() -> createCQ(numOfUsers, true));
client1.invoke(() -> executeCQ(numOfUsers, new boolean[] {false, false}, numOfPuts,
new String[numOfUsers]));
client1.invoke(() -> readyForEvents());
if (keepAlive == null) {
client1.invoke(() -> closeCache());
} else {
client1.invoke(() -> closeCache(keepAlive));
}
server1.invoke(() -> doPuts(numOfPuts, true/* put last key */));
client1.invoke(() -> createClientCache(javaProps2, authInit, authProps,
new int[] {port1, port2}, numOfUsers, durableClientId, postAuthzAllowed));
client1.invoke(() -> createCQ(numOfUsers, true));
client1.invoke(() -> executeCQ(numOfUsers, new boolean[] {false, false}, numOfPuts,
new String[numOfUsers]));
client1.invoke(() -> readyForEvents());
if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) {
// Don't wait as no user is authorized to receive cq events.
Thread.sleep(1000); // TODO: use Awaitility
} else {
client1.invoke(() -> waitForLastKey(0, true));
}
int numOfCreates = keepAlive == null ? 0 : (keepAlive ? numOfPuts + 1/* last key */ : 0);
client1.invoke(() -> checkCQListeners(numOfUsers, postAuthzAllowed, numOfCreates, 0));
client1.invoke(() -> proxyCacheClose(new int[] {0, 1}, keepAlive));
client1.invoke(() -> createProxyCache(new int[] {0, 1}, authProps));
client1.invoke(() -> createCQ(numOfUsers, true));
client1.invoke(() -> executeCQ(numOfUsers, new boolean[] {false, false}, numOfPuts,
new String[numOfUsers]));
server1.invoke(() -> doPuts(numOfPuts, true/* put last key */));
if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) {
// Don't wait as no user is authorized to receive cq events.
Thread.sleep(1000); // TODO: use Awaitility
} else {
client1.invoke(() -> waitForLastKey(0, false));
}
int numOfUpdates = numOfPuts + 1;
client1.invoke(() -> checkCQListeners(numOfUsers, postAuthzAllowed, 0, numOfUpdates));
}
private void createServerCache(final Properties serverProps, final Properties javaProps,
final int serverPort) {
SecurityTestUtils.createCacheServer(serverProps, javaProps, serverPort, true,
NO_EXCEPTION);
}
private void readyForEvents() {
GemFireCacheImpl.getInstance().readyForEvents();
}
/**
* NOTE: "final boolean[] postAuthzAllowed" is never used
*/
private void createClientCache(final Properties javaProps, final String authInit,
final Properties[] authProps, final int ports[], final int numOfUsers, final String durableId,
final boolean[] postAuthzAllowed) {
createCacheClientForMultiUserMode(numOfUsers, authInit, authProps, javaProps, ports, 0, false,
durableId, NO_EXCEPTION);
}
private void createCQ(final int num, final boolean isDurable)
throws CqException, CqExistsException {
for (int i = 0; i < num; i++) {
QueryService cqService = getProxyCaches(i).getQueryService();
String cqName = "CQ_" + i;
String queryStr =
cqNameToQueryStrings.get(cqName) + getProxyCaches(i).getRegion(regionName).getFullPath();
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
((CqQueryTestListener) cqListeners[0]).cqName = cqName;
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create CQ.
CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa, isDurable);
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
}
}
private void executeCQ(final int num, final boolean[] initialResults,
final int expectedResultsSize, final String[] expectedErr)
throws CqException, RegionNotFoundException {
for (int i = 0; i < num; i++) {
try {
if (expectedErr[i] != null) {
getLogWriter()
.info("<ExpectedException action=add>" + expectedErr[i] + "</ExpectedException>");
}
CqQuery cq1 = null;
String cqName = "CQ_" + i;
String queryStr = cqNameToQueryStrings.get(cqName)
+ getProxyCaches(i).getRegion(regionName).getFullPath();
QueryService cqService = getProxyCaches(i).getQueryService();
// Get CqQuery object.
cq1 = cqService.getCq(cqName);
if (cq1 == null) {
getLogWriter().info("Failed to get CqQuery object for CQ name: " + cqName);
fail("Failed to get CQ " + cqName);
} else {
getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
}
if (initialResults[i]) {
SelectResults cqResults = null;
cqResults = cq1.executeWithInitialResults();
getLogWriter().info("initial result size = " + cqResults.size());
assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
if (expectedResultsSize >= 0) {
assertEquals("unexpected results size", expectedResultsSize, cqResults.size());
}
} else {
cq1.execute();
assertTrue("execute() state mismatch", cq1.getState().isRunning());
}
} finally {
if (expectedErr[i] != null) {
getLogWriter()
.info("<ExpectedException action=remove>" + expectedErr[i] + "</ExpectedException>");
}
}
}
}
private void doPuts(final int num, final boolean putLastKey) {
Region region = GemFireCacheImpl.getInstance().getRegion(regionName);
for (int i = 0; i < num; i++) {
region.put("CQ_key" + i, "CQ_value" + i);
}
if (putLastKey) {
region.put("LAST_KEY", "LAST_KEY");
}
}
private void waitForLastKey(final int cqIndex, final boolean isCreate) {
String cqName = "CQ_" + cqIndex;
QueryService qService = getProxyCaches(cqIndex).getQueryService();
ClientCQImpl cqQuery = (ClientCQImpl) qService.getCq(cqName);
if (isCreate) {
((CqQueryTestListener) cqQuery.getCqListeners()[cqIndex]).waitForCreated("LAST_KEY");
} else {
((CqQueryTestListener) cqQuery.getCqListeners()[cqIndex]).waitForUpdated("LAST_KEY");
}
}
private void checkCQListeners(final int numOfUsers, final boolean[] expectedListenerInvocation,
final int createEventsSize, final int updateEventsSize) {
for (int i = 0; i < numOfUsers; i++) {
String cqName = "CQ_" + i;
QueryService qService = getProxyCaches(i).getQueryService();
ClientCQImpl cqQuery = (ClientCQImpl) qService.getCq(cqName);
if (expectedListenerInvocation[i]) {
for (CqListener listener : cqQuery.getCqListeners()) {
assertEquals(createEventsSize, ((CqQueryTestListener) listener).getCreateEventCount());
assertEquals(updateEventsSize, ((CqQueryTestListener) listener).getUpdateEventCount());
}
} else {
for (CqListener listener : cqQuery.getCqListeners()) {
assertEquals(0, ((CqQueryTestListener) listener).getTotalEventCount());
}
}
}
}
/**
* WARNING: "final Boolean keepAliveFlags" is treated as a ternary: null, true, false
*/
private void proxyCacheClose(final int[] userIndices, final Boolean keepAliveFlags) {
if (keepAliveFlags != null) {
for (int i : userIndices) {
getProxyCaches(i).close(keepAliveFlags);
}
} else {
for (int i : userIndices) {
getProxyCaches(i).close();
}
}
}
}