blob: e1fef0281d1d361f32282cc091726f6dc0118c0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fw_dunit.hpp"
#include <geode/CqAttributesFactory.hpp>
#include <geode/CqAttributes.hpp>
#include <geode/CqListener.hpp>
#include <geode/CqQuery.hpp>
#include <geode/CqServiceStatistics.hpp>
#include <geode/AuthenticatedView.hpp>
#include <ace/OS.h>
#include <ace/High_Res_Timer.h>
#include <string>
#define ROOT_NAME "testThinClientSecurityCQAuthorizationMU"
#define ROOT_SCOPE DISTRIBUTED_ACK
#include "CacheHelper.hpp"
#include "QueryStrings.hpp"
#include "QueryHelper.hpp"
#include <geode/Query.hpp>
#include <geode/QueryService.hpp>
#include "ThinClientCQ.hpp"
#include "CacheHelper.hpp"
#include "ThinClientHelper.hpp"
#include <ace/Process.h>
using apache::geode::client::testframework::security::CredentialGenerator;
using apache::geode::client::AuthenticatedView;
using apache::geode::client::CqAttributesFactory;
using apache::geode::client::CqEvent;
using apache::geode::client::CqListener;
using apache::geode::client::CqOperation;
using apache::geode::client::Exception;
using apache::geode::client::IllegalStateException;
using apache::geode::client::QueryService;
const char *locHostPort =
CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
std::shared_ptr<CredentialGenerator> credentialGeneratorHandler;
#define CLIENT1 s1p1
#define SERVER1 s2p1
#define CLIENT2 s1p2
#define MAX_LISTNER 8
const char *cqNames[MAX_LISTNER] = {"MyCq_0", "MyCq_1", "MyCq_2", "MyCq_3",
"MyCq_4", "MyCq_5", "MyCq_6", "MyCq_7"};
const char *queryStrings[MAX_LISTNER] = {
"select * from /Portfolios p where p.ID < 1",
"select * from /Portfolios p where p.ID < 2",
"select * from /Portfolios p where p.ID = 2",
"select * from /Portfolios p where p.ID >= 3",
"select * from /Portfolios p where p.ID = 4",
"select * from /Portfolios p where p.ID = 5",
"select * from /Portfolios p where p.ID = 6",
"select * from /Portfolios p where p.ID = 7"};
const char *regionNamesCq[] = {"Portfolios", "Positions", "Portfolios2",
"Portfolios3"};
class MyCqListener : public CqListener {
uint8_t m_id;
uint32_t m_numInserts;
uint32_t m_numUpdates;
uint32_t m_numDeletes;
uint32_t m_numEvents;
public:
uint8_t getId() { return m_id; }
uint32_t getNumInserts() { return m_numInserts; }
uint32_t getNumUpdates() { return m_numUpdates; }
uint32_t getNumDeletes() { return m_numDeletes; }
uint32_t getNumEvents() { return m_numEvents; }
explicit MyCqListener(uint8_t id)
: m_id(id),
m_numInserts(0),
m_numUpdates(0),
m_numDeletes(0),
m_numEvents(0) {}
~MyCqListener() noexcept override = default;
inline void updateCount(const CqEvent &cqEvent) {
printf(" in cqEvent.getQueryOperation() %d id = %d\n",
static_cast<int>(cqEvent.getQueryOperation()), m_id);
printf(" in update key = %s \n",
(dynamic_cast<CacheableString *>(cqEvent.getKey().get()))
->value()
.c_str());
m_numEvents++;
switch (cqEvent.getQueryOperation()) {
case CqOperation::OP_TYPE_CREATE:
m_numInserts++;
break;
case CqOperation::OP_TYPE_UPDATE:
m_numUpdates++;
break;
case CqOperation::OP_TYPE_DESTROY:
m_numDeletes++;
break;
default:
break;
}
printf(" in create = %d, update = %d , delete = %d ", m_numInserts,
m_numUpdates, m_numDeletes);
}
void onEvent(const CqEvent &cqe) override {
LOG("MyCqListener::OnEvent called");
updateCount(cqe);
}
void onError(const CqEvent &cqe) override {
updateCount(cqe);
LOG("MyCqListener::OnError called");
}
void close() override { LOG("MyCqListener::close called"); }
};
std::string getXmlPath() {
char xmlPath[1000] = {'\0'};
const char *path = ACE_OS::getenv("TESTSRC");
ASSERT(path != nullptr,
"Environment variable TESTSRC for test source directory is not set.");
strncpy(xmlPath, path, strlen(path) - strlen("cppcache"));
strncat(xmlPath, "xml/Security/", sizeof(xmlPath) - strlen(xmlPath) - 1);
return std::string(xmlPath);
}
void initCredentialGenerator() {
credentialGeneratorHandler = CredentialGenerator::create("DUMMY3");
if (credentialGeneratorHandler == nullptr) {
FAIL("credentialGeneratorHandler is nullptr");
}
}
std::shared_ptr<Properties> userCreds;
void initClientCq(const bool isthinClient) {
userCreds = Properties::create();
auto config = Properties::create();
// credentialGeneratorHandler->getAuthInit(config);
credentialGeneratorHandler->getValidCredentials(userCreds);
if (cacheHelper == nullptr) {
cacheHelper = new CacheHelper(isthinClient, config);
}
ASSERT(cacheHelper, "Failed to create a CacheHelper client instance.");
try {
auto serializationRegistry =
CacheRegionHelper::getCacheImpl(cacheHelper->getCache().get())
->getSerializationRegistry();
serializationRegistry->addDataSerializableType(
Position::createDeserializable, 2);
serializationRegistry->addDataSerializableType(
Portfolio::createDeserializable, 3);
} catch (const IllegalStateException &) {
// ignore exception
}
}
DUNIT_TASK_DEFINITION(CLIENT1, CreateServer1)
{
initCredentialGenerator();
std::string cmdServerAuthenticator;
if (isLocalServer) {
cmdServerAuthenticator = credentialGeneratorHandler->getServerCmdParams(
"authenticator:authorizer:authorizerPP", getXmlPath());
printf("string %s", cmdServerAuthenticator.c_str());
CacheHelper::initServer(
1, "remotequery.xml", nullptr,
const_cast<char *>(cmdServerAuthenticator.c_str()));
LOG("Server1 started");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CreateServer2)
{
std::string cmdServerAuthenticator;
if (isLocalServer) {
cmdServerAuthenticator = credentialGeneratorHandler->getServerCmdParams(
"authenticator:authorizer:authorizerPP", getXmlPath());
printf("string %s", cmdServerAuthenticator.c_str());
CacheHelper::initServer(
2, "remotequery2.xml", nullptr,
const_cast<char *>(cmdServerAuthenticator.c_str()));
LOG("Server2 started");
}
SLEEP(20000);
}
END_TASK_DEFINITION
void stepOne() {
LOG("StepOne1 complete. 1");
initClientCq(true);
LOG("StepOne1 complete. 2");
createRegionForCQMU(regionNamesCq[0], USE_ACK, false);
LOG("StepOne1 complete. 3");
auto regptr = getHelper()->getRegion(regionNamesCq[0]);
LOG("StepOne1 complete. 4");
LOG("StepOne1 complete. 5");
auto subregPtr =
regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());
LOG("StepOne complete.");
}
void stepOne2() {
LOG("StepOne2 complete. 1");
initClientCq(true);
LOG("StepOne2 complete. 2");
createRegionForCQMU(regionNamesCq[0], USE_ACK, false);
LOG("StepOne2 complete. 3");
auto regptr = getHelper()->getRegion(regionNamesCq[0]);
LOG("StepOne2 complete. 4");
LOG("StepOne2 complete. 5");
auto subregPtr =
regptr->createSubregion(regionNamesCq[1], regptr->getAttributes());
LOG("StepOne2 complete.");
}
DUNIT_TASK_DEFINITION(CLIENT1, StepOne_PoolEP)
{ stepOne(); }
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepOne2_PoolEP)
{
initCredentialGenerator();
stepOne2();
}
END_TASK_DEFINITION
std::shared_ptr<Pool> getPool(const char *name) {
return getHelper()->getCache()->getPoolManager().find(name);
}
static std::shared_ptr<QueryService> userQueryService;
AuthenticatedView setUpAuthenticatedView(const int userId) {
auto creds = Properties::create();
char tmp[25] = {'\0'};
sprintf(tmp, "user%d", userId);
creds->insert("security-username", tmp);
creds->insert("security-password", tmp);
return getHelper()->getCache()->createAuthenticatedView(creds,
regionNamesCq[0]);
}
DUNIT_TASK_DEFINITION(CLIENT1, StepTwo)
{
auto authenticatedView = setUpAuthenticatedView(4);
auto regPtr0 = authenticatedView.getRegion(regionNamesCq[0]);
auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
authenticatedView.close();
LOG("StepTwo complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepThree)
{
uint8_t i = 0;
// QueryHelper * qh = &QueryHelper::getHelper();
// userCache = getVirtualCache(userCreds, regionNamesCq[0]);
auto authenticatedView = setUpAuthenticatedView(4);
userQueryService = authenticatedView.getQueryService();
std::shared_ptr<QueryService> qs;
qs = userQueryService;
try {
for (i = 0; i < MAX_LISTNER; i++) {
auto cqLstner = std::make_shared<MyCqListener>(i);
CqAttributesFactory cqFac;
cqFac.addCqListener(cqLstner);
auto cqAttr = cqFac.create();
auto qry = qs->newCq(cqNames[i], queryStrings[i], cqAttr);
qry->execute();
}
LOG("EXECUTE 1 START");
// qs->executeCqs();
LOG("EXECUTE 1 STOP");
} catch (const Exception &excp) {
std::string logmsg = "";
logmsg += excp.getName();
logmsg += ": ";
logmsg += excp.what();
LOG(logmsg.c_str());
LOG(excp.getStackTrace());
}
authenticatedView.close();
LOG("StepThree complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, StepTwo2)
{
auto authenticatedView = setUpAuthenticatedView(3);
auto regPtr0 = authenticatedView.getRegion(regionNamesCq[0]);
auto subregPtr0 = regPtr0->getSubregion(regionNamesCq[1]);
QueryHelper *qh = &QueryHelper::getHelper();
qh->populatePortfolioData(regPtr0, 3, 2, 1);
qh->populatePositionData(subregPtr0, 3, 2);
for (int i = 1; i <= 4; i++) {
auto port = std::make_shared<Portfolio>(i, 2);
char tmp[25] = {'\0'};
sprintf(tmp, "port1-%d", i);
auto keyport = CacheableKey::create(tmp);
regPtr0->put(keyport, port);
SLEEP(10); // sleep a while to allow server query to complete
}
LOG("StepTwo2 complete. Sleeping .25 min for server query to complete...");
SLEEP(15000); // sleep .25 min to allow server query to complete
authenticatedView.close();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepFour)
{
// auto userCache = getVirtualCache(userCreds, regionNamesCq[0]);
std::shared_ptr<QueryService> qs;
qs = userQueryService;
char buf[1024];
uint8_t i = 0;
CqAttributesFactory cqFac;
for (i = 0; i < MAX_LISTNER; i++) {
sprintf(buf, "get info for cq[%s]:", cqNames[i]);
LOG(buf);
auto cqy = qs->getCq(cqNames[i]);
// auto cqStats = cqy->getStatistics();
}
// if key port1-4 then only query 3 and 4 will satisfied
auto cqy = qs->getCq(cqNames[3]);
auto cqAttr = cqy->getCqAttributes();
auto vl = cqAttr->getCqListeners();
auto cqListener_3 = static_cast<MyCqListener *>(vl[0].get());
printf(" cqListener_3 should have one create event = %d \n",
cqListener_3->getNumInserts());
ASSERT(cqListener_3->getNumInserts() == 1,
"incorrect number of events got listener 3");
cqy = qs->getCq(cqNames[4]);
cqAttr = cqy->getCqAttributes();
vl = cqAttr->getCqListeners();
auto cqListener_4 = static_cast<MyCqListener *>(vl[0].get());
printf(" cqListener_4 should have one create event = %d \n",
cqListener_4->getNumInserts());
ASSERT(cqListener_4->getNumInserts() == 1,
"incorrect number of events got listener 4");
/*for(i=0; i < MAX_LISTNER; i++)
{
sprintf(buf, "get info for cq[%s]:", cqNames[i]);
LOG(buf);
auto cqy = qs->getCq(cqNames[i]);
cqy->stop();
cqy->close();
//auto cqStats = cqy->getStatistics();
}*/
// userCache->close();
LOG("StepFour complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StepFour2)
{
auto qs = userQueryService;
char buf[1024];
uint8_t i = 0;
for (i = 0; i < MAX_LISTNER; i++) {
sprintf(buf, "get info for cq[%s]:", cqNames[i]);
LOG(buf);
auto cqy = qs->getCq(cqNames[i]);
// auto cqStats = cqy->getStatistics();
}
// if key port1-4 then only query 3 and 4 will satisfied
auto cqy = qs->getCq(cqNames[3]);
auto cqAttr = cqy->getCqAttributes();
auto vl = cqAttr->getCqListeners();
MyCqListener *cqListener_3 = static_cast<MyCqListener *>(vl[0].get());
printf(" cqListener_3 should have one update event = %d \n",
cqListener_3->getNumUpdates());
ASSERT(cqListener_3->getNumUpdates() == 1,
"incorrect number of events got listener 3");
cqy = qs->getCq(cqNames[4]);
cqAttr = cqy->getCqAttributes();
vl = cqAttr->getCqListeners();
auto cqListener_4 = static_cast<MyCqListener *>(vl[0].get());
printf(" cqListener_4 should have one update event = %d \n",
cqListener_4->getNumUpdates());
ASSERT(cqListener_4->getNumUpdates() == 1,
"incorrect number of events got listener 4");
/*for(i=0; i < MAX_LISTNER; i++)
{
sprintf(buf, "get info for cq[%s]:", cqNames[i]);
LOG(buf);
auto cqy = qs->getCq(cqNames[i]);
cqy->stop();
cqy->close();
//auto cqStats = cqy->getStatistics();
}*/
LOG("StepFour2 complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, CloseCache1)
{
LOG("cleanProc 1...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT2, CloseCache2)
{
LOG("cleanProc 2...");
cleanProc();
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer1)
{
LOG("closing Server1...");
if (isLocalServer) {
CacheHelper::closeServer(1);
LOG("SERVER1 stopped");
}
SLEEP(5000);
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER1, CloseServer2)
{
LOG("closing Server2...");
if (isLocalServer) {
CacheHelper::closeServer(2);
LOG("SERVER2 stopped");
}
}
END_TASK_DEFINITION
void doThinClientCq() {
// CALL_TASK(CreateLocator);
// CALL_TASK(CreateServer1_Locator);
//
// CALL_TASK(StepOne_PoolLocator);
// CALL_TASK(StepOne2_PoolLocator);
CALL_TASK(StepTwo);
CALL_TASK(StepThree);
CALL_TASK(StepTwo2);
CALL_TASK(StepFour); // validates listener events
CALL_TASK(CreateServer2);
CALL_TASK(CloseServer1);
CALL_TASK(StepTwo2); // again put data
CALL_TASK(StepFour2);
CALL_TASK(CloseCache1);
CALL_TASK(CloseCache2);
CALL_TASK(CloseServer2);
// CALL_TASK(CloseLocator);
}
DUNIT_MAIN
{ doThinClientCq(); }
END_MAIN