blob: 50e0727a80f19e6a23a3edce34fe83e917583da1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define ROOT_NAME "testThinClientPoolExecuteFunctionThrowsException"
#include <thread>
#include <chrono>
#include "fw_dunit.hpp"
#include "ThinClientHelper.hpp"
#include "testobject/VariousPdxTypes.hpp"
#include <geode/UserFunctionExecutionException.hpp>
#include <geode/FunctionService.hpp>
#include <geode/DefaultResultCollector.hpp>
/* This is to test
1- funtion execution on pool
*/
#define CLIENT1 s1p1
#define LOCATOR1 s2p1
#define SERVER s2p2
using apache::geode::client::Cacheable;
using apache::geode::client::CacheableArrayList;
using apache::geode::client::CacheableBoolean;
using apache::geode::client::CacheableVector;
using apache::geode::client::DefaultResultCollector;
using apache::geode::client::FunctionService;
using apache::geode::client::UserFunctionExecutionException;
bool isLocalServer = false;
bool isLocator = false;
bool isPoolWithEndpoint = false;
const char *locHostPort =
CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1);
const char *poolRegNames[] = {"partition_region", "PoolRegion2"};
const char *serverGroup = "ServerGroup1";
const char *getFuncIName = "MultiGetFunctionI";
const char *putFuncIName = "MultiPutFunctionI";
const char *getFuncName = "MultiGetFunction";
const char *putFuncName = "MultiPutFunction";
const char *rjFuncName = "RegionOperationsFunction";
const char *exFuncName = "ExceptionHandlingFunction";
const char *exFuncNameSendException = "executeFunction_SendException";
const char *exFuncNamePdxType = "PdxFunctionTest";
const char *FEOnRegionPrSHOP = "FEOnRegionPrSHOP";
const char *FEOnRegionPrSHOP_OptimizeForWrite =
"FEOnRegionPrSHOP_OptimizeForWrite";
class MyResultCollector : public DefaultResultCollector {
public:
MyResultCollector()
: m_endResultCount(0), m_addResultCount(0), m_getResultCount(0) {}
~MyResultCollector() override = default;
std::shared_ptr<CacheableVector> getResult(
std::chrono::milliseconds timeout) override {
m_getResultCount++;
return DefaultResultCollector::getResult(timeout);
}
void addResult(const std::shared_ptr<Cacheable> &resultItem) override {
m_addResultCount++;
if (resultItem == nullptr) {
return;
}
if (auto results =
std::dynamic_pointer_cast<CacheableArrayList>(resultItem)) {
for (auto &result : *results) {
DefaultResultCollector::addResult(result);
}
} else {
DefaultResultCollector::addResult(resultItem);
}
}
void endResults() override {
m_endResultCount++;
DefaultResultCollector::endResults();
}
uint32_t getEndResultCount() { return m_endResultCount; }
uint32_t getAddResultCount() { return m_addResultCount; }
uint32_t getGetResultCount() { return m_getResultCount; }
private:
uint32_t m_endResultCount;
uint32_t m_addResultCount;
uint32_t m_getResultCount;
};
DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1)
{
// starting locator
if (isLocator) {
CacheHelper::initLocator(1);
LOG("Locator1 started");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER, StartS12)
{
const char *lhp = nullptr;
if (!isPoolWithEndpoint) lhp = locHostPort;
if (isLocalServer) {
CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp);
}
if (isLocalServer) {
CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp);
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StartC1)
{
// initClient(true);
initClientWithPool(true, poolRegNames[0], locHostPort, serverGroup, nullptr,
0, true, -1, -1, 60000, /*singlehop*/ true,
/*threadLocal*/ true);
// createPool(poolName, locHostPort,serverGroup, nullptr, 0, true );
// createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName);
auto regPtr0 = createRegionAndAttachPool(poolRegNames[0], USE_ACK);
; // getHelper()->createRegion( poolRegNames[0], USE_ACK);
regPtr0->registerAllKeys();
LOG("Clnt1Init complete.");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest)
{
auto regPtr0 = getHelper()->getRegion(poolRegNames[0]);
char buf[128];
for (int i = 0; i < 34; i++) {
sprintf(buf, "VALUE--%d", i);
auto value = CacheableString::create(buf);
sprintf(buf, "KEY--%d", i);
auto key = CacheableString::create(buf);
regPtr0->put(key, value);
}
std::this_thread::sleep_for(
std::chrono::seconds(10)); // let the put finish
//-----------------------Test with sendException
// onRegion-------------------------------//
for (int i = 1; i <= 200; i++) {
std::shared_ptr<Cacheable> value(CacheableInt32::create(i));
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
regPtr0->put(key, value);
}
LOG("Put for execKey's on region complete.");
LOG("Adding filter");
auto arrList = CacheableArrayList::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
arrList->push_back(key);
}
auto filter = CacheableVector::create();
for (int i = 100; i < 120; i++) {
sprintf(buf, "execKey-%d", i);
auto key = CacheableKey::create(buf);
filter->push_back(key);
}
LOG("Adding filter done.");
auto args = CacheableBoolean::create(1);
auto funcExec = FunctionService::onRegion(regPtr0);
auto collector = funcExec.withArgs(args).withFilter(filter).execute(
exFuncNameSendException, std::chrono::seconds(15));
auto result = collector->getResult();
if (result == nullptr) {
ASSERT(false, "echo String : result is nullptr");
} else {
for (size_t i = 0; i < result->size(); i++) {
if (auto uFEPtr =
std::dynamic_pointer_cast<UserFunctionExecutionException>(
result->operator[](i))) {
LOGINFO("Done casting to uFEPtr");
LOGINFO("Read expected uFEPtr exception %s ",
uFEPtr->getMessage().c_str());
} else {
FAIL(
"exFuncNameSendException casting to string for bool argument "
"exception.");
}
}
}
LOG("exFuncNameSendException done for bool argument.");
collector = funcExec.withArgs(arrList).withFilter(filter).execute(
exFuncNameSendException, std::chrono::seconds(15));
ASSERT(collector != nullptr, "onRegion collector for arrList nullptr");
std::this_thread::sleep_for(std::chrono::seconds(2));
try {
auto fil = CacheableVector::create();
fil->push_back(CacheableInt32::create(1));
auto exe = FunctionService::onRegion(regPtr0);
LOGINFO("Executing the exception test it is expected to throw.");
auto executeFunctionResult3 =
funcExec.withArgs(arrList)
.withFilter(filter)
.execute("ThinClientRegionExceptionTest",
std::chrono::seconds(15))
->getResult();
FAIL("Failed to throw expected exception.");
} catch (...) {
LOGINFO("Finished Executing the exception test Successfully");
}
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(CLIENT1, StopC1)
{
cleanProc();
LOG("Clnt1Down complete: Keepalive = True");
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(SERVER, CloseServers)
{
// stop servers
if (isLocalServer) {
CacheHelper::closeServer(1);
LOG("SERVER1 stopped");
}
if (isLocalServer) {
CacheHelper::closeServer(2);
LOG("SERVER2 stopped");
}
if (isLocalServer) {
CacheHelper::closeServer(3);
LOG("SERVER3 stopped");
}
isPoolWithEndpoint = true;
}
END_TASK_DEFINITION
DUNIT_TASK_DEFINITION(LOCATOR1, CloseLocator1)
{
// stop locator
if (isLocator) {
CacheHelper::closeLocator(1);
LOG("Locator1 stopped");
}
}
END_TASK_DEFINITION
void runFunctionExecution() {
CALL_TASK(StartLocator1);
CALL_TASK(StartS12);
CALL_TASK(StartC1);
CALL_TASK(Client1OpTest);
CALL_TASK(StopC1);
CALL_TASK(CloseServers);
CALL_TASK(CloseLocator1);
}
DUNIT_MAIN
{ runFunctionExecution(); }
END_MAIN