blob: d7038ff45e692d4d1258ac90d4a656b209d5255c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.security;
import static org.apache.geode.internal.cache.GemFireCacheImpl.getInstance;
import static org.apache.geode.security.SecurityTestUtils.NO_EXCEPTION;
import static org.apache.geode.security.SecurityTestUtils.REGION_NAME;
import static org.apache.geode.security.SecurityTestUtils.closeCache;
import static org.apache.geode.security.SecurityTestUtils.concatProperties;
import static org.apache.geode.security.SecurityTestUtils.createCacheClientForMultiUserMode;
import static org.apache.geode.security.SecurityTestUtils.getProxyCaches;
import static org.apache.geode.test.dunit.Assert.assertEquals;
import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.operations.OperationContext.OperationCode;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.cq.dunit.CqQueryTestListener;
import org.apache.geode.cache.query.cq.internal.ClientCQImpl;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.security.generator.AuthzCredentialGenerator;
import org.apache.geode.security.generator.CredentialGenerator;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.SecurityTest;
/**
* This is for multiuser-authentication
*/
@Category({SecurityTest.class})
public class ClientCQPostAuthorizationDUnitTest extends ClientAuthorizationTestCase {
private Map<String, String> cqNameToQueryStrings = new HashMap<>();
@Override
protected final void preSetUpClientAuthorizationTestBase() throws Exception {
getSystem();
invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
this.cqNameToQueryStrings.put("CQ_0", "SELECT * FROM ");
this.cqNameToQueryStrings.put("CQ_1", "SELECT * FROM ");
}
@Override
public final void postTearDownClientAuthorizationTestBase() throws Exception {
this.cqNameToQueryStrings.clear();
}
@Test
public void testAllowCQForAllMultiusers() throws Exception {
/*
* Start a server Start a client1 with two users with valid credentials and post-authz'ed for CQ
* Each user registers a unique CQ Client2 does some operations on the region which satisfies
* both the CQs Validate that listeners for both the CQs are invoked.
*/
doStartUp(2, 5, new boolean[] {true, true}, false);
}
@Test
public void testDisallowCQForAllMultiusers() throws Exception {
/*
* Start a server Start a client1 with two users with valid credentials but not post-authz'ed
* for CQ Each user registers a unique CQ Client2 does some operations on the region which
* satisfies both the CQs Validate that listeners for none of the CQs are invoked.
*/
doStartUp(2, 5, new boolean[] {false, false}, false);
}
@Test
public void testDisallowCQForSomeMultiusers() throws Exception {
/*
* Start a server Start a client1 with two users with valid credentials User1 is post-authz'ed
* for CQ but user2 is not. Each user registers a unique CQ Client2 does some operations on the
* region which satisfies both the CQs Validate that listener for User1's CQ is invoked but that
* for User2's CQ is not invoked.
*/
doStartUp(2, 5, new boolean[] {true, false}, false);
}
@Test
public void testAllowCQForAllMultiusersWithFailover() throws Exception {
/*
* Start a server1 Start a client1 with two users with valid credentials and post-authz'ed for
* CQ Each user registers a unique CQ Client2 does some operations on the region which satisfies
* both the CQs Validate that listeners for both the CQs are invoked. Start server2 and shutdown
* server1 Client2 does some operations on the region which satisfies both the CQs Validate that
* listeners for both the CQs are get updates.
*/
doStartUp(2, 5, new boolean[] {true, true}, true);
}
private void doStartUp(final int numOfUsers, final int numOfPuts,
final boolean[] postAuthzAllowed, final boolean failover) throws Exception {
AuthzCredentialGenerator authzGenerator = getXmlAuthzGenerator();
CredentialGenerator credentialGenerator = authzGenerator.getCredentialGenerator();
Properties extraAuthProps = credentialGenerator.getSystemProperties();
Properties javaProps = credentialGenerator.getJavaProperties();
Properties extraAuthzProps = authzGenerator.getSystemProperties();
String authenticator = credentialGenerator.getAuthenticator();
String accessor = authzGenerator.getAuthorizationCallback();
String authInit = credentialGenerator.getAuthInit();
TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(authzGenerator);
Properties serverProps =
buildProperties(authenticator, accessor, true, extraAuthProps, extraAuthzProps);
Properties opCredentials;
credentialGenerator = tgen.getCredentialGenerator();
final Properties javaProps2 =
credentialGenerator == null ? null : credentialGenerator.getJavaProperties();
int[] indices = new int[numOfPuts];
for (int index = 0; index < numOfPuts; ++index) {
indices[index] = index;
}
Random rnd = new Random();
Properties[] authProps = new Properties[numOfUsers];
for (int i = 0; i < numOfUsers; i++) {
int rand = rnd.nextInt(100) + 1;
if (postAuthzAllowed[i]) {
// For callback, GET should be allowed
opCredentials = tgen.getAllowedCredentials(
new OperationCode[] {OperationCode.EXECUTE_CQ, OperationCode.GET},
new String[] {REGION_NAME}, indices, rand);
} else {
// For callback, GET should be disallowed
opCredentials = tgen.getDisallowedCredentials(new OperationCode[] {OperationCode.GET},
new String[] {REGION_NAME}, indices, rand);
}
authProps[i] =
concatProperties(new Properties[] {opCredentials, extraAuthProps, extraAuthzProps});
}
// Get ports for the servers
int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
int port1 = randomAvailableTCPPorts[0];
int port2 = randomAvailableTCPPorts[1];
// Close down any running servers
server1.invoke(() -> closeCache());
server2.invoke(() -> closeCache());
server1.invoke(() -> createTheServerCache(serverProps, javaProps, port1));
client1.invoke(() -> createClientCache(javaProps2, authInit, authProps,
new int[] {port1, port2}, numOfUsers, postAuthzAllowed));
client2.invoke(() -> createClientCache(javaProps2, authInit, authProps,
new int[] {port1, port2}, numOfUsers, postAuthzAllowed));
client1.invoke(() -> createCQ(numOfUsers));
client1.invoke(() -> executeCQ(numOfUsers, new boolean[] {false, false}, numOfPuts,
new String[numOfUsers], postAuthzAllowed));
client2.invoke(() -> doPuts(numOfPuts, true/* put last key */));
if (!postAuthzAllowed[0]) {
// There is no point waiting as no user is authorized to receive cq events.
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
} // TODO: replace with Awaitility
} else {
client1.invoke(() -> waitForLastKey(0));
if (postAuthzAllowed[1]) {
client1.invoke(() -> waitForLastKey(1));
}
}
client1.invoke(() -> checkCQListeners(numOfUsers, postAuthzAllowed, numOfPuts + 1/* last key */,
0, !failover));
if (failover) {
server2.invoke(() -> createTheServerCache(serverProps, javaProps, port2));
server1.invoke(() -> closeCache());
// Allow time for client1 to register its CQs on server2
server2.invoke(() -> allowCQsToRegister(2));
client2.invoke(() -> doPuts(numOfPuts, true/* put last key */));
client1.invoke(() -> waitForLastKeyUpdate(0));
client1.invoke(() -> checkCQListeners(numOfUsers, postAuthzAllowed,
numOfPuts + 1/* last key */, numOfPuts + 1/* last key */, true));
}
}
private void createTheServerCache(final Properties serverProps, final Properties javaProps,
final int serverPort) {
SecurityTestUtils.createCacheServer(serverProps, javaProps, serverPort, true, NO_EXCEPTION);
}
private void createClientCache(final Properties javaProps, final String authInit,
final Properties[] authProps, final int ports[], final int numOfUsers,
final boolean[] postAuthzAllowed) {
createCacheClientForMultiUserMode(numOfUsers, authInit, authProps, javaProps, ports, 0, false,
NO_EXCEPTION);
}
private void createCQ(final int num) throws CqException, CqExistsException {
for (int i = 0; i < num; i++) {
QueryService cqService = getProxyCaches(i).getQueryService();
String cqName = "CQ_" + i;
String queryStr =
cqNameToQueryStrings.get(cqName) + getProxyCaches(i).getRegion(REGION_NAME).getFullPath();
// Create CQ Attributes.
CqAttributesFactory cqf = new CqAttributesFactory();
CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
((CqQueryTestListener) cqListeners[0]).cqName = cqName;
cqf.initCqListeners(cqListeners);
CqAttributes cqa = cqf.create();
// Create CQ.
CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
}
}
private void executeCQ(final int num, final boolean[] initialResults,
final int expectedResultsSize, final String[] expectedErr, final boolean[] postAuthzAllowed)
throws RegionNotFoundException, CqException {
for (int i = 0; i < num; i++) {
try {
if (expectedErr[i] != null) {
getLogWriter()
.info("<ExpectedException action=add>" + expectedErr[i] + "</ExpectedException>");
}
CqQuery cq1 = null;
String cqName = "CQ_" + i;
// String queryStr = cqNameToQueryStrings.get(cqName) +
// getProxyCaches(i).getRegion(REGION_NAME).getFullPath();
QueryService cqService = getProxyCaches(i).getQueryService();
// Get CqQuery object.
cq1 = cqService.getCq(cqName);
if (cq1 == null) {
getLogWriter().info("Failed to get CqQuery object for CQ name: " + cqName);
fail("Failed to get CQ " + cqName);
} else {
getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
assertTrue("newCq() state mismatch", cq1.getState().isStopped());
}
if (initialResults[i]) {
SelectResults cqResults = null;
try {
cqResults = cq1.executeWithInitialResults();
} catch (CqException ce) {
if (ce.getCause() instanceof NotAuthorizedException && !postAuthzAllowed[i]) {
getLogWriter().info("Got expected exception for CQ " + cqName);
} else {
getLogWriter().info("CqService is: " + cqService);
throw new AssertionError("Failed to execute CQ " + cqName, ce);
}
}
getLogWriter().info("initial result size = " + cqResults.size());
assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
if (expectedResultsSize >= 0) {
assertEquals("unexpected results size", expectedResultsSize, cqResults.size());
}
} else {
try {
cq1.execute();
} catch (CqException ce) {
if (ce.getCause() instanceof NotAuthorizedException && !postAuthzAllowed[i]) {
getLogWriter().info("Got expected exception for CQ " + cqName);
} else {
throw ce;
}
}
assertTrue("execute() state mismatch", cq1.getState().isRunning() == postAuthzAllowed[i]);
}
} finally {
if (expectedErr[i] != null) {
getLogWriter()
.info("<ExpectedException action=remove>" + expectedErr[i] + "</ExpectedException>");
}
}
}
}
private void doPuts(final int num, final boolean putLastKey) {
Region region = getProxyCaches(0).getRegion(REGION_NAME);
for (int i = 0; i < num; i++) {
region.put("CQ_key" + i, "CQ_value" + i);
}
if (putLastKey) {
region.put("LAST_KEY", "LAST_KEY");
}
}
private void waitForLastKey(final int cqIndex) {
String cqName = "CQ_" + cqIndex;
QueryService qService = getProxyCaches(cqIndex).getQueryService();
ClientCQImpl cqQuery = (ClientCQImpl) qService.getCq(cqName);
((CqQueryTestListener) cqQuery.getCqListeners()[0]).waitForCreated("LAST_KEY");
}
private void waitForLastKeyUpdate(final int cqIndex) {
String cqName = "CQ_" + cqIndex;
QueryService qService = getProxyCaches(cqIndex).getQueryService();
ClientCQImpl cqQuery = (ClientCQImpl) qService.getCq(cqName);
((CqQueryTestListener) cqQuery.getCqListeners()[0]).waitForUpdated("LAST_KEY");
}
private void allowCQsToRegister(final int number) {
final int num = number;
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
CqService cqService = getInstance().getCqService();
cqService.start();
Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
if (cqs != null) {
return cqs.size() >= num;
} else {
return false;
}
}
@Override
public String description() {
return num + "Waited for " + num + " CQs to be registered on this server.";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
private boolean checkCQListeners(final int numOfUsers, final boolean[] expectedListenerInvocation,
final int createEventsSize, final int updateEventsSize, final boolean closeCache) {
for (int i = 0; i < numOfUsers; i++) {
String cqName = "CQ_" + i;
QueryService qService = getProxyCaches(i).getQueryService();
ClientCQImpl cqQuery = (ClientCQImpl) qService.getCq(cqName);
if (expectedListenerInvocation[i]) {
for (CqListener listener : cqQuery.getCqListeners()) {
assertEquals(createEventsSize, ((CqQueryTestListener) listener).getCreateEventCount());
assertEquals(updateEventsSize, ((CqQueryTestListener) listener).getUpdateEventCount());
}
} else {
for (CqListener listener : cqQuery.getCqListeners()) {
assertEquals(0, ((CqQueryTestListener) listener).getTotalEventCount());
}
}
if (closeCache) {
getProxyCaches(i).close();
}
}
return true;
}
}