blob: 98ea64f860a705c33bd2f5de5502f3559522a7b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <geode/RegionShortcut.hpp>
#include <geode/RegionFactory.hpp>
#include <geode/PersistenceManager.hpp>
#include <CacheableToken.hpp>
#include <CacheRegionHelper.hpp>
#include <MapEntry.hpp>
#include "CacheImpl.hpp"
#include "fw_helper.hpp"
using apache::geode::client::Cache;
using apache::geode::client::Cacheable;
using apache::geode::client::CacheableKey;
using apache::geode::client::CacheableString;
using apache::geode::client::CacheableToken;
using apache::geode::client::CacheFactory;
using apache::geode::client::CacheRegionHelper;
using apache::geode::client::DiskPolicyType;
using apache::geode::client::Exception;
using apache::geode::client::HashMapOfCacheable;
using apache::geode::client::PersistenceManager;
using apache::geode::client::Properties;
using apache::geode::client::Region;
using apache::geode::client::RegionAttributes;
using apache::geode::client::RegionAttributesFactory;
using apache::geode::client::RegionShortcut;
uint32_t numOfEnt;
std::string sqlite_dir = "SqLiteRegionData";
static constexpr char const *kMaxPageCountStr = "MaxPageCount";
static constexpr char const *kPageSizeStr = "PageSize";
static constexpr char const *kPersistenceDirStr = "PersistenceDirectory";
// Return the number of keys and values in entries map.
void getNumOfEntries(std::shared_ptr<Region> &regionPtr, uint32_t num) {
auto v = regionPtr->keys();
auto vecValues = regionPtr->values();
std::cout << "Values vector size is " << vecValues.size() << "\n";
std::cout << "Num is " << num << "\n";
ASSERT(vecValues.size() == num, "size of value vec and num not equal");
}
void setAttributes(RegionAttributes regionAttributes,
std::string pDir = sqlite_dir) {
RegionAttributesFactory regionAttributesFactory;
regionAttributesFactory.setCachingEnabled(true);
regionAttributesFactory.setLruEntriesLimit(10);
regionAttributesFactory.setInitialCapacity(1000);
regionAttributesFactory.setDiskPolicy(DiskPolicyType::OVERFLOWS);
auto sqliteProperties = Properties::create();
sqliteProperties->insert("MaxPageCount", "1073741823");
sqliteProperties->insert("PageSize", "65536");
sqliteProperties->insert("PersistenceDirectory", pDir.c_str());
regionAttributesFactory.setPersistenceManager(
"SqLiteImpl", "createSqLiteInstance", sqliteProperties);
regionAttributes = regionAttributesFactory.create();
}
void setAttributesWithMirror(RegionAttributes regionAttributes) {
RegionAttributesFactory regionAttributesFactory;
regionAttributesFactory.setCachingEnabled(true);
regionAttributesFactory.setLruEntriesLimit(20);
regionAttributesFactory.setInitialCapacity(1000);
regionAttributesFactory.setDiskPolicy(DiskPolicyType::OVERFLOWS);
auto sqliteProperties = Properties::create();
sqliteProperties->insert("MaxPageCount", "1073741823");
sqliteProperties->insert("PageSize", "65536");
sqliteProperties->insert("PersistenceDirectory", sqlite_dir.c_str());
regionAttributesFactory.setPersistenceManager(
"SqLiteImpl", "createSqLiteInstance", sqliteProperties);
regionAttributes = regionAttributesFactory.create();
}
// Testing for attibute validation.
void validateAttribute(std::shared_ptr<Region> &regionPtr) {
RegionAttributes regAttr = regionPtr->getAttributes();
int initialCapacity = regAttr.getInitialCapacity();
ASSERT(initialCapacity == 1000, "Expected initial capacity to be 1000");
const DiskPolicyType type = regAttr.getDiskPolicy();
ASSERT(type == DiskPolicyType::OVERFLOWS,
"Expected Action should overflow to disk");
}
void checkOverflowTokenValues(std::shared_ptr<Region> &regionPtr,
uint32_t num) {
std::vector<std::shared_ptr<CacheableKey>> v = regionPtr->keys();
std::shared_ptr<CacheableKey> keyPtr;
std::shared_ptr<Cacheable> valuePtr;
int count = 0;
size_t nonoverflowCount = 0;
for (uint32_t i = 0; i < num; i++) {
keyPtr = v.at(i);
auto rPtr = regionPtr->getEntry(keyPtr);
valuePtr = rPtr->getValue();
if (CacheableToken::isOverflowed(valuePtr) == true) {
count++;
} else {
nonoverflowCount++;
}
valuePtr = nullptr;
}
ASSERT(count == 0, "No of overflowed entries should be zero");
ASSERT(nonoverflowCount == v.size(),
"Non overflowed entries should match key size");
}
void checkOverflowToken(std::shared_ptr<Region> &regionPtr, uint32_t lruLimit) {
std::vector<std::shared_ptr<CacheableKey>> v = regionPtr->keys();
std::shared_ptr<CacheableKey> keyPtr;
std::shared_ptr<Cacheable> valuePtr;
int normalCount = 0;
int overflowCount = 0;
int invalidCount = 0;
int destroyedCount = 0;
int tombstoneCount = 0;
for (uint32_t i = 0; i < static_cast<uint32_t>(v.size()); i++) {
keyPtr = v.at(i);
auto rPtr = regionPtr->getEntry(keyPtr);
valuePtr = rPtr->getValue();
if (CacheableToken::isOverflowed(valuePtr) == true) {
overflowCount++;
} else if (CacheableToken::isTombstone(valuePtr)) {
tombstoneCount++;
} else if (CacheableToken::isInvalid(valuePtr)) {
invalidCount++;
} else if (CacheableToken::isDestroyed(valuePtr)) {
destroyedCount++;
} else if (valuePtr != nullptr) {
normalCount++;
}
valuePtr = nullptr;
}
std::cout << "Keys vector size is " << v.size() << "\n";
std::cout << "Normal entries count is " << normalCount << "\n";
std::cout << "Overflow entries count is " << overflowCount << "\n";
std::cout << "Invalid entries count is " << invalidCount << "\n";
std::cout << "Destoyed entries count is " << destroyedCount << "\n";
std::cout << "Tombstone entries count is " << tombstoneCount << "\n";
std::cout << "LRU entries limit is " << lruLimit << "\n";
ASSERT(normalCount <= static_cast<int>(lruLimit),
"Normal entries count should not exceed LRU entries limit.");
}
// Testing for put operation
void doNput(std::shared_ptr<Region> &regionPtr, uint32_t num,
uint32_t start = 0) {
// Put 1 KB of data locally for each entry
char text[1024];
memset(text, 'A', 1023);
text[1023] = '\0';
auto valuePtr = CacheableString::create(text);
for (uint32_t i = start; i < num; i++) {
auto keyname = std::string("key-") + std::to_string(i);
auto key = CacheableKey::create(keyname);
std::cout << "Putting key = " << keyname << "\n";
regionPtr->put(key, valuePtr);
}
}
// Testing for get operation
uint32_t doNget(std::shared_ptr<Region> &regionPtr, uint32_t num,
uint32_t start = 0) {
uint32_t countFound = 0;
uint32_t countNotFound = 0;
for (uint32_t i = start; i < num; i++) {
auto keyname = std::string("key-") + std::to_string(i);
auto valuePtr =
std::dynamic_pointer_cast<CacheableString>(regionPtr->get(keyname));
std::cout << "Getting key = " << keyname << "\n";
if (valuePtr == nullptr) {
countNotFound++;
} else {
countFound++;
}
}
std::cout << "completed doNget";
std::cout << "count found " << countFound;
std::cout << "num found " << num;
ASSERT(countFound == (num - start),
"Number of entries found and put should match");
LOGINFO("found:%d and Not found: %d", countFound, countNotFound);
return countFound;
}
/**
* Test the entry operation ( local invalidate, localDestroy ).
*/
void testEntryDestroy(std::shared_ptr<Region> &regionPtr, uint32_t num) {
std::vector<std::shared_ptr<CacheableKey>> v = regionPtr->keys();
std::vector<std::shared_ptr<Cacheable>> vecValues;
std::shared_ptr<Cacheable> valuePtr;
for (uint32_t i = 45; i < 50; i++) {
try {
std::cout << "try to destroy key" << i << std::endl;
regionPtr->destroy(v.at(i));
} catch (Exception &ex) {
std::cout << ex.what() << std::endl;
ASSERT(false, "entry missing");
}
}
v = regionPtr->keys();
ASSERT(v.size() == num - 5, "size of key vec not equal");
}
void testEntryInvalidate(std::shared_ptr<Region> &regionPtr, uint32_t num) {
std::vector<std::shared_ptr<CacheableKey>> v = regionPtr->keys();
std::vector<std::shared_ptr<Cacheable>> vecValues;
std::shared_ptr<Cacheable> valuePtr;
for (uint32_t i = 40; i < 45; i++) {
try {
std::cout << "try to invalidate key" << i << std::endl;
regionPtr->invalidate(v.at(i));
} catch (Exception &ex) {
std::cout << ex.what() << std::endl;
ASSERT(false, "entry missing");
}
}
v = regionPtr->keys();
ASSERT(v.size() == num, "size of key vec not equal");
}
class PutThread {
public:
PutThread(std::shared_ptr<Region> region, int min, int max)
: min_{min}, max_{max}, region_{region} {}
void run() {
/** put some values into the cache. */
doNput(region_, max_, min_);
/** do some gets... printing what we find in the cache. */
doNget(region_, max_, min_);
LOG("Completed doNget");
}
void start() {
thread_ = std::thread{[this]() { run(); }};
}
void stop() {
if (thread_.joinable()) {
thread_.join();
}
}
protected:
int min_;
int max_;
std::thread thread_;
std::shared_ptr<Region> region_;
};
void verifyGetAll(std::shared_ptr<Region> region, int startIndex) {
std::vector<std::shared_ptr<CacheableKey>> keysVector;
for (int i = 0; i <= 100; i++) keysVector.push_back(CacheableKey::create(i));
// keysVector.push_back(CacheableKey::create(101)); //key not there
const auto valuesMap = region->getAll(keysVector);
if (valuesMap.size() == keysVector.size()) {
int i = startIndex;
for (const auto &iter : valuesMap) {
auto key = std::dynamic_pointer_cast<CacheableKey>(iter.first);
auto mVal = iter.second;
if (mVal != nullptr) {
int val = atoi(mVal->toString().c_str());
ASSERT(val == i, "value not matched");
}
}
}
}
void createRegion(std::shared_ptr<Region> &regionPtr, const char *regionName,
std::shared_ptr<Properties> &cacheProps,
std::shared_ptr<Properties> &sqLiteProps) {
auto cacheFactoryPtr = CacheFactory(cacheProps);
auto cachePtr = std::make_shared<Cache>(CacheFactory().create());
ASSERT(cachePtr != nullptr, "Expected cache to be NON-nullptr");
auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::LOCAL);
regionFactory.setCachingEnabled(true);
regionFactory.setLruEntriesLimit(10);
regionFactory.setInitialCapacity(1000);
regionFactory.setDiskPolicy(DiskPolicyType::OVERFLOWS);
regionFactory.setPersistenceManager("SqLiteImpl", "createSqLiteInstance",
sqLiteProps);
regionPtr = regionFactory.create(regionName);
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
}
void setSqLiteProperties(std::shared_ptr<Properties> &sqliteProperties,
int maxPageCount = 1073741823, int pageSize = 65536,
std::string pDir = sqlite_dir) {
sqliteProperties = Properties::create();
sqliteProperties->insert(kMaxPageCountStr, maxPageCount);
sqliteProperties->insert(kPageSizeStr, pageSize);
sqliteProperties->insert(kPersistenceDirStr, pDir.c_str());
ASSERT(sqliteProperties != nullptr,
"Expected sqlite properties to be NON-nullptr");
}
// creation of subregion.
void createSubRegion(std::shared_ptr<Region> &regionPtr,
std::shared_ptr<Region> &subRegion,
const std::string &regionName,
std::string pDir = sqlite_dir) {
RegionAttributes regionAttributesPtr;
setAttributes(regionAttributesPtr, pDir);
subRegion = regionPtr->createSubregion(regionName, regionAttributesPtr);
ASSERT(subRegion != nullptr, "Expected region to be NON-nullptr");
std::string fileName = pDir + '/' + regionName + '/' + regionName + ".db";
ASSERT(boost::filesystem::exists(fileName), "persistence file not present");
doNput(subRegion, 50);
doNget(subRegion, 50);
}
BEGIN_TEST(OverFlowTest)
{
/** Creating a cache to manage regions. */
std::shared_ptr<Properties> sqliteProperties;
auto cacheProperties = Properties::create();
setSqLiteProperties(sqliteProperties);
std::shared_ptr<Region> regionPtr;
createRegion(regionPtr, "OverFlowRegion", cacheProperties,
sqliteProperties);
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
validateAttribute(regionPtr);
/** put some values into the cache. */
doNput(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
/** do some gets... printing what we find in the cache. */
doNget(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
LOG("Completed doNget");
testEntryDestroy(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
LOG("Completed testEntryDestroy");
testEntryInvalidate(regionPtr, 45);
checkOverflowToken(regionPtr, 10);
LOG("Completed testEntryInvalidate");
getNumOfEntries(regionPtr, 40);
/** check whether value get evicted and token gets set as overflow */
checkOverflowTokenValues(regionPtr, 45);
/** test to verify same region repeatedly to ensure that the persistece
files are created and destroyed correctly */
std::shared_ptr<Region> subRegion;
for (int i = 0; i < 10; i++) {
createSubRegion(regionPtr, subRegion, "SubRegion");
ASSERT(subRegion != nullptr, "Expected region to be NON-nullptr");
checkOverflowToken(subRegion,
10); // check the overflow count for each reion
subRegion->destroyRegion();
ASSERT(subRegion->isDestroyed(), "Expected region is not destroyed ");
subRegion = nullptr;
std::string sqliteDirSubRgn =
sqlite_dir + "/" + boost::asio::ip::host_name() + '_' +
std::to_string(boost::this_process::get_id()) + "/_" +
regionPtr->getName() + "_SubRegion/file_0.db";
ASSERT(!boost::filesystem::exists(sqliteDirSubRgn),
"persistence file still present");
}
// cache close
regionPtr->getRegionService().close();
}
END_TEST(OverFlowTest)
BEGIN_TEST(OverFlowTest_absPath)
{
std::shared_ptr<RegionAttributes> attrsPtr;
auto wdPath = boost::filesystem::current_path().string();
ASSERT(!wdPath.empty(),
"Expected current Working Directory to be not empty");
std::string absPersistenceDir = wdPath + "/absSqLite";
/** Creating a cache to manage regions. */
std::shared_ptr<Properties> sqliteProperties;
auto cacheProperties = Properties::create();
setSqLiteProperties(sqliteProperties, 1073741823, 65536, absPersistenceDir);
std::shared_ptr<Region> regionPtr;
createRegion(regionPtr, "OverFlowRegion", cacheProperties,
sqliteProperties);
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
validateAttribute(regionPtr);
/** put some values into the cache. */
doNput(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
LOG("Completed doNput");
/** do some gets... printing what we find in the cache. */
doNget(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
LOG("Completed doNget");
testEntryDestroy(regionPtr, 50);
checkOverflowToken(regionPtr, 10);
LOG("Completed doNput");
testEntryInvalidate(regionPtr, 45);
checkOverflowToken(regionPtr, 10);
LOG("Completed testEntryInvalidate");
getNumOfEntries(regionPtr, 40);
/** check whether value get evicted and token gets set as overflow */
checkOverflowTokenValues(regionPtr, 45);
std::shared_ptr<Region> subRegion;
for (int i = 0; i < 10; i++) {
createSubRegion(regionPtr, subRegion, "SubRegion", absPersistenceDir);
ASSERT(subRegion != nullptr, "Expected region to be NON-nullptr");
subRegion->destroyRegion();
ASSERT(subRegion->isDestroyed(), "Expected region is not destroyed ");
subRegion = nullptr;
std::string fileName = absPersistenceDir + "/SubRegion/SubRegion.db";
ASSERT(!boost::filesystem::exists(fileName),
"persistence file still present");
}
// cache close
regionPtr->getRegionService().close();
}
END_TEST(OverFlowTest_absPath)
BEGIN_TEST(OverFlowTest_SqLiteFull)
{
auto cacheFactoryPtr = CacheFactory();
auto cachePtr = std::make_shared<Cache>(CacheFactory().create());
ASSERT(cachePtr != nullptr, "Expected cache to be NON-nullptr");
auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::LOCAL);
regionFactory.setCachingEnabled(true);
regionFactory.setLruEntriesLimit(1);
regionFactory.setInitialCapacity(1000);
regionFactory.setDiskPolicy(DiskPolicyType::OVERFLOWS);
auto sqliteProperties = Properties::create();
sqliteProperties->insert(
"MaxPageCount", "10"); // 10 * 1024 is arround 10kB is the db file size
sqliteProperties->insert("PageSize", "1024");
sqliteProperties->insert("PersistenceDirectory", sqlite_dir.c_str());
regionFactory.setPersistenceManager("SqLiteImpl", "createSqLiteInstance",
sqliteProperties);
auto regionPtr = regionFactory.create("OverFlowRegion");
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
try {
doNput(regionPtr, 100);
FAIL("Didn't get the expected exception");
} catch (apache::geode::client::Exception
&ex) { // expected sqlite full exception
// catching generic message as we dont
// have any specific sqlitefull exception
LOG(std::string("Got expected exception ") + ex.getName() +
"; msg = " + ex.what());
}
// cache close
cachePtr->close();
}
END_TEST(OverFlowTest_SqLiteFull)
BEGIN_TEST(OverFlowTest_HeapLRU)
{
/** Creating a cache to manage regions. */
auto pp = Properties::create();
pp->insert("heap-lru-limit", 1);
pp->insert("heap-lru-delta", 10);
auto cacheFactoryPtr = CacheFactory(pp);
auto cachePtr = std::make_shared<Cache>(CacheFactory().create());
ASSERT(cachePtr != nullptr, "Expected cache to be NON-nullptr");
auto regionFactory = cachePtr->createRegionFactory(RegionShortcut::LOCAL);
regionFactory.setCachingEnabled(true);
regionFactory.setLruEntriesLimit(1024 * 10);
regionFactory.setInitialCapacity(1000);
regionFactory.setDiskPolicy(DiskPolicyType::OVERFLOWS);
auto sqliteProperties = Properties::create();
sqliteProperties->insert(
"MaxPageCount",
"2147483646"); // 10 * 1024 is arround 10kB is the db file size
sqliteProperties->insert("PageSize", "65536");
sqliteProperties->insert("PersistenceDirectory", sqlite_dir.c_str());
regionFactory.setPersistenceManager("SqLiteImpl", "createSqLiteInstance",
sqliteProperties);
auto regionPtr = regionFactory.create("OverFlowRegion");
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
validateAttribute(regionPtr);
/** put some values into the cache. */
doNput(regionPtr, 1024 * 10);
LOG("Completed doNput");
/** do some gets... printing what we find in the cache. */
doNget(regionPtr, 1024 * 10);
LOG("Completed doNget");
testEntryDestroy(regionPtr, 1024 * 10);
LOG("Completed testEntryDestroy");
/** test to verify same region repeatedly to ensure that the persistece
files are created and destroyed correctly */
std::shared_ptr<Region> subRegion;
for (int i = 0; i < 10; i++) {
RegionAttributes regionAttributes;
setAttributes(regionAttributes);
subRegion = regionPtr->createSubregion("SubRegion", regionAttributes);
ASSERT(subRegion != nullptr, "Expected region to be NON-nullptr");
std::string fileName = sqlite_dir + "/SubRegion/SubRegion.db";
ASSERT(boost::filesystem::exists(fileName),
"persistence file not present");
doNput(subRegion, 50);
doNget(subRegion, 50);
subRegion->destroyRegion();
ASSERT(subRegion->isDestroyed(), "Expected region is not destroyed ");
subRegion = nullptr;
ASSERT(!boost::filesystem::exists(fileName),
"persistence file still present");
}
// cache close
cachePtr->close();
}
END_TEST(OverFlowTest_HeapLRU)
BEGIN_TEST(OverFlowTest_MultiThreaded)
{
/** Creating a cache to manage regions. */
auto cachePtr = std::make_shared<Cache>(CacheFactory().create());
ASSERT(cachePtr != nullptr, "Expected cache to be NON-nullptr");
RegionAttributes regionAttributes;
setAttributes(regionAttributes);
/** Create a region with caching and LRU. */
std::shared_ptr<Region> regionPtr;
auto cacheImpl = CacheRegionHelper::getCacheImpl(cachePtr.get());
cacheImpl->createRegion("OverFlowRegion", regionAttributes, regionPtr);
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
validateAttribute(regionPtr);
/** test to verify same region repeatedly to ensure that the persistece
files are created and destroyed correctly */
/** put some values into the cache. */
PutThread *threads[4];
for (int thdIdx = 0; thdIdx < 4; thdIdx++) {
threads[thdIdx] =
new PutThread(regionPtr, thdIdx * 100 + 1, 100 * (thdIdx + 1));
threads[thdIdx]->start();
}
SLEEP(2000); // wait for threads to become active
for (int thdIdx = 0; thdIdx < 4; thdIdx++) {
threads[thdIdx]->stop();
}
SLEEP(2000);
checkOverflowToken(regionPtr, 10);
// cache close
cachePtr->close();
}
END_TEST(OverFlowTest_MultiThreaded)
BEGIN_TEST(OverFlowTest_PutGetAll)
{
/** Creating a cache to manage regions. */
auto cachePtr = std::make_shared<Cache>(CacheFactory().create());
ASSERT(cachePtr != nullptr, "Expected cache to be NON-nullptr");
RegionAttributes regionAttributes;
setAttributes(regionAttributes);
/** Create a region with caching and LRU. */
std::shared_ptr<Region> regionPtr;
auto cacheImpl = CacheRegionHelper::getCacheImpl(cachePtr.get());
cacheImpl->createRegion("OverFlowRegion", regionAttributes, regionPtr);
ASSERT(regionPtr != nullptr, "Expected regionPtr to be NON-nullptr");
validateAttribute(regionPtr);
// putAll some entries
HashMapOfCacheable map0;
map0.clear();
for (int i = 1; i <= 50; i++) {
map0.emplace(CacheableKey::create(i), Cacheable::create(i));
}
regionPtr->putAll(map0);
checkOverflowToken(regionPtr, 10);
LOG("Completed putAll");
verifyGetAll(regionPtr, 1);
checkOverflowToken(regionPtr, 10);
LOG("Completed getAll");
// cache close
cachePtr->close();
}
END_TEST(OverFlowTest_PutGetAll)