blob: 690d2b01cded9b5539ba150aba4495e90da53198 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
#include "BDBImpl.hpp"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_sys_stat.h"
namespace gemfire {
extern "C" DBHelper* getBDBHelper();
};
using namespace gemfire;
#define MAXFILENAMESIZE 256
namespace {
size_t g_default_cache_size_gb = 0;
size_t g_default_cache_size_mb = 314572800;
uint32_t g_default_paze_size = 65536;
uint32_t g_max_db_file_size = 512000000;
std::string g_default_persistence_directory = "BDBImplRegionData";
std::string g_default_env_directory = "BDBImplEnv";
};
bool BDBImpl::m_isEnvReady=false;
ACE_Thread_Mutex BDBImpl::m_envReadyLock;
std::string BDBImpl::m_gEnvDirectory = g_default_env_directory;
AtomicInc BDBImpl::m_regionCount=0;
AtomicInc BDBImpl::m_activeDBSize=0;
int bdb_rmdir(const char *);
void BDBImpl::init(const RegionPtr& region, PropertiesPtr& diskProperties)
{
// Get db ready to receive data for storage
// 1. create DB, DBTable, Keymap
// 2. create region directory
closeFlag=0;
m_regionCount++;
if (region == NULLPTR) {
throw InitFailedException("Region passed to init is NULL.");
}
const char *regionFullPath = region->getFullPath();
m_regionName = regionFullPath;
convertToDirName(m_regionName);
// Set the default values
m_cacheSizeGb = g_default_cache_size_gb;
m_cacheSizeMb = g_default_cache_size_mb;
m_pageSize = g_default_paze_size;
m_maxDBFileSize = g_max_db_file_size;
m_PersistenceDirectory = g_default_persistence_directory + "_" + m_regionName;
m_regionEnvDirectory = g_default_env_directory;
std::string parentPersistenceDirectory, parentRegionEnvDirectory;
if (diskProperties != NULLPTR) {
CacheableStringPtr cacheSizeGb = diskProperties->find("CacheSizeGb");
CacheableStringPtr cacheSizeMb = diskProperties->find("CacheSizeMb");
CacheableStringPtr pageSize = diskProperties->find("PageSize");
CacheableStringPtr maxFileSize = diskProperties->find("MaxFileSize");
CacheableStringPtr persDir = diskProperties->find("PersistenceDirectory");
CacheableStringPtr envDir = diskProperties->find("EnvironmentDirectory");
ACE_TCHAR hname[MAXHOSTNAMELEN];
if (ACE_OS::hostname( hname, sizeof(hname)-1) != 0) {
throw InitFailedException("Failed to get host name.");
}
long pid = ACE_OS::getpid();
char myDir[512];
if (!ACE_OS::snprintf(myDir, sizeof(myDir)-1, "/%s_%ld/", hname, pid)){
throw InitFailedException("Failed to create unique directory.");
}
if (cacheSizeGb != NULLPTR) {
m_cacheSizeGb = atoi(cacheSizeGb->asChar());
}
if (cacheSizeMb != NULLPTR) {
m_cacheSizeMb = ((unsigned long)atol(cacheSizeMb->asChar()))*(unsigned long)1024*(unsigned long)1024;
}
if (pageSize != NULLPTR) {
m_pageSize = atoi(pageSize->asChar());
}
if (maxFileSize != NULLPTR) {
m_maxDBFileSize = atol(maxFileSize->asChar());
}
if (persDir != NULLPTR) {
parentPersistenceDirectory = persDir->asChar();
m_PersistenceDirectory = persDir->asChar() + std::string(myDir);
}
if (envDir != NULLPTR) {
parentRegionEnvDirectory = envDir->asChar();
m_regionEnvDirectory = envDir->asChar() + std::string(myDir);
}
}
char currWDPath[512];
char *wdPath = ACE_OS::getcwd(currWDPath,512);
// Initialize DB Helper
m_DBHelper = getBDBHelper();
#ifndef _WIN32
if (m_PersistenceDirectory.at(0) != '/') {
if (wdPath == NULL) {
throw InitFailedException("Failed to get absolute path for persistence directory.");
}
parentPersistenceDirectory = std::string(wdPath) + "/" + parentPersistenceDirectory;
m_PersistenceDirectory = std::string(wdPath) + "/" + m_PersistenceDirectory;
}
if (m_regionEnvDirectory.at(0) != '/') {
if (wdPath == NULL) {
throw InitFailedException("Failed to get absolute path for environment directory.");
}
parentRegionEnvDirectory = std::string(wdPath) + "/" + parentRegionEnvDirectory;
m_regionEnvDirectory = std::string(wdPath) + "/" + m_regionEnvDirectory;
}
#else
if (m_PersistenceDirectory.find(":",0) == std::string::npos) {
if (wdPath == NULL) {
throw InitFailedException("Failed to get absolute path for persistence directory.");
}
parentPersistenceDirectory = std::string(wdPath) + "/" + parentPersistenceDirectory;
m_PersistenceDirectory = std::string(wdPath) + "/" + m_PersistenceDirectory;
}
if (m_regionEnvDirectory.find(":",0) == std::string::npos) {
if (wdPath == NULL) {
throw InitFailedException("Failed to get absolute path for persistence environment directory.");
}
parentRegionEnvDirectory = std::string(wdPath) + "/" + parentRegionEnvDirectory;
m_regionEnvDirectory = std::string(wdPath) + "/" + m_regionEnvDirectory;
}
#endif
LOGINFO("Absolute path for persistence environment directory: %s",m_regionEnvDirectory.c_str());
LOGINFO("Absolute path for persistence directory: %s",m_PersistenceDirectory.c_str());
//ACE_stat fileStat;
// Create persistence directory
{
ACE_Guard<ACE_Thread_Mutex> guard( m_envReadyLock );
if (!m_isEnvReady) {
// Create and initialize the environment. This is done only once as all regions share the same environment.
m_gEnvDirectory = m_regionEnvDirectory;
ACE_OS::mkdir(parentRegionEnvDirectory.c_str());
ACE_OS::mkdir(m_gEnvDirectory.c_str());
/* if (ACE_OS::stat(m_gEnvDirectory.c_str(), &fileStat)) {
// Directory creation failure
throw InitFailedException("Failed to create environment directory.");
}
*/
m_DBHelper->createEnvironment(m_gEnvDirectory,m_cacheSizeGb,m_cacheSizeMb);
m_isEnvReady=true;
} //end if isEnvReady
} //end guard
// Check if persistence directory matches the one which has been initialized.
if (m_gEnvDirectory != m_regionEnvDirectory) {
throw InitFailedException("Environment directory settings do not match.");
}
// Create persistence directory
LOGFINE("Creating persistence directory: %s",m_PersistenceDirectory.c_str());
ACE_OS::mkdir(parentPersistenceDirectory.c_str());
ACE_OS::mkdir(m_PersistenceDirectory.c_str());
/* if (ACE_OS::stat(m_PersistenceDirectory.c_str(), &fileStat)) {
// Directory creation failure
throw InitFailedException("Failed to create persistence directory.");
}
*/
// Create region directory
std::string regionDirectory = m_PersistenceDirectory + "/" + m_regionName;
ACE_OS::mkdir(regionDirectory.c_str());
/* if (ACE_OS::stat(regionDirectory.c_str(), &fileStat)) {
// Directory creation failure
throw InitFailedException("Failed to create region directory.");
}
*/
std::string regionDBFile = regionDirectory + "/file_0.db";
LOGFINE("Creating persistence region file: %s",regionDBFile.c_str());
void *dbhandle = m_DBHelper->createNewDB(regionDBFile);
m_DBHandleVector.push_back(dbhandle);
m_currentDBHandle = dbhandle;
m_currentDBIndex = 0;
m_dbFileManager = new DBFileManager(this);
m_dbFileManager->activate();
LOGFINE("Initialization successful for persistence of region %s", m_regionName.c_str());
m_initFlag=true;
}
void BDBImpl::write(const CacheableKeyPtr& key, const CacheablePtr& value, void *& dbHandle)
{
void *handle = dbHandle;
bool isUpdate = true;
if (handle == NULL) {
// This guard is required so that the currDBHandle pointer cannot be changed when this write happens.
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_activeDBMutex);
handle = m_currentDBHandle;
dbHandle = m_currentDBHandle;
isUpdate = false;
}
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_activeDBMutex);
if (m_currentDBHandle == NULL) {
throw DiskFailureException("Disk handle not available");
}
writeToDB(handle, key, value, isUpdate);
}
bool BDBImpl::writeAll()
{
// ARB: Do we need this in persistence?
return true;
}
CacheablePtr BDBImpl::read(const CacheableKeyPtr& key, void *& dbHandle)
{
CacheablePtr value;
value = readFromDB(dbHandle,key);
return value;
}
bool BDBImpl::readAll()
{
// ARB: Used in persistence.
return true;
}
//void BDBImpl::invalidate(const CacheableKeyPtr& key)
//{
// ARB: Used in persistence.
//}
void BDBImpl::destroyRegion()
{
cleanupRegionData();
}
//int BDBImpl::numEntries() const
//{
// ARB: Used in persistence.
// return 0;
//}
void BDBImpl::destroy(const CacheableKeyPtr& key, void *& dbHandle)
{
// Handle of DB that is used to store the key-value pair has been passed as function argument.
// Serialize key.
DataOutput keyDataBuffer;
uint32_t keyBufferSize;
//SerializationHelper::serialize( key, keyDataBuffer );
keyDataBuffer.writeObject( key );
// TODO: correct constness instead of casting
void* keyData = const_cast<uint8_t*>(keyDataBuffer.getBuffer(&keyBufferSize));
m_DBHelper->destroy(dbHandle,keyData,keyBufferSize);
}
BDBImpl::BDBImpl()
{
// Constructor does nothing. Initialization is done in init().
m_initFlag = false;
m_dbFileManager = NULL;
m_cacheSizeGb = 0;
m_cacheSizeMb = 0;
m_pageSize = 0;
m_maxDBFileSize = 0;
m_DBHelper = NULL;
m_currentDBHandle = NULL;
m_currentDBIndex = 0;
}
void BDBImpl::writeToDB(void *dbhandle, const CacheableKeyPtr& key, const CacheablePtr& value, bool isUpdate)
{
// Serialize key and value.
DataOutput keyDataBuffer, valueDataBuffer;
uint32_t keyBufferSize, valueBufferSize;
//SerializationHelper::serialize( key, keyDataBuffer );
//SerializationHelper::serialize( value, valueDataBuffer);
keyDataBuffer.writeObject( key );
valueDataBuffer.writeObject( value );
// TODO: correct constness instead of casting
void* keyData = const_cast<uint8_t*>(keyDataBuffer.getBuffer(&keyBufferSize));
void* valueData = const_cast<uint8_t*>(valueDataBuffer.getBuffer(
&valueBufferSize));
m_DBHelper->writeToDB(dbhandle,keyData,keyBufferSize,valueData,valueBufferSize);
if (!isUpdate) {
m_activeDBSize += keyBufferSize + valueBufferSize;
}
}
CacheablePtr BDBImpl::readFromDB(void *dbhandle, const CacheableKeyPtr &key)
{
// Serialize key.
DataOutput keyDataBuffer;
uint32_t keyBufferSize;
//SerializationHelper::serialize( key, keyDataBuffer );
keyDataBuffer.writeObject( key );
void* keyData = const_cast<uint8_t*>(keyDataBuffer.getBuffer(&keyBufferSize));
void *valueData;
uint32_t valueBufferSize;
m_DBHelper->readFromDB(dbhandle,keyData,keyBufferSize,valueData,valueBufferSize);
// Deserialize object and return value.
uint8_t *valueBytes = (uint8_t *)malloc(sizeof(uint8_t)*valueBufferSize);
memcpy(valueBytes, valueData, valueBufferSize);
DataInput valueDataBuffer(valueBytes, valueBufferSize);
//CacheablePtr retValue = static_cast<CacheablePtr>(SerializationHelper::deserialize(valueDataBuffer).ptr());
CacheablePtr retValue;
valueDataBuffer.readObject( retValue );
// Free memory for serialized form of Cacheable object.
free(valueData);
free(valueBytes);
return retValue;
}
int BDBImpl::DBFileManager::svc()
{
//char activeDBFileName[MAXFILENAMESIZE];
//ACE_stat activeDBFileStat;
while(m_isRunning) {
// sleep for 1 second for testing.
ACE_OS::sleep(1);
//ARB: **Commenting out file size stat method.
//std::string fileNamePrefix = m_bdbPersistenceManager->m_PersistenceDirectory + "/" + m_bdbPersistenceManager->m_regionName + "/file_";
//sprintf(activeDBFileName,"%s%d.db",fileNamePrefix.c_str(),m_bdbPersistenceManager->m_currentDBIndex);
//if (ACE_OS::stat(activeDBFileName,&activeDBFileStat))
//{
// throw DiskFailureException("Failed to stat active db file.");
//}
//long activeDBFileSize = activeDBFileStat.st_size;
//LOGINFO("ARB:[svc()] activeDBFileSize = %ld",activeDBFileSize);
//LOGINFO("ARB:[svc()] m_maxDBFileSize = %ld",m_bdbPersistenceManager->m_maxDBFileSize);
//if (activeDBFileSize > m_bdbPersistenceManager->m_maxDBFileSize)
//ARB: **end-comment for file size stat method
uint64_t thresholdSize = ((uint64_t) 60 * (m_bdbPersistenceManager->m_maxDBFileSize/100));
if (m_bdbPersistenceManager->m_activeDBSize.value() > thresholdSize)
{
char newDBFileName[MAXFILENAMESIZE];
int newIndex = m_bdbPersistenceManager->m_currentDBIndex + 1;
sprintf(newDBFileName,"file_%d.db",newIndex);
std::string regionDBFile = m_bdbPersistenceManager->m_PersistenceDirectory + "/" + m_bdbPersistenceManager->m_regionName + "/" + newDBFileName;
// Create new db
void *dbHandle = m_bdbPersistenceManager->m_DBHelper->createNewDBNoThrow(regionDBFile);
m_bdbPersistenceManager->m_DBHandleVector.push_back(dbHandle);
{
// Change the active db.
//LOGINFO("ARB:[svc()] Changing active db from %d to %d",m_bdbPersistenceManager->m_currentDBIndex,newIndex);
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_bdbPersistenceManager->m_activeDBMutex);
m_bdbPersistenceManager->m_currentDBIndex = newIndex;
m_bdbPersistenceManager->m_currentDBHandle = dbHandle;
m_bdbPersistenceManager->m_activeDBSize = 0;
if (dbHandle == NULL) {
break;
}
LOGFINEST("BDB:[svc()] %s: Changed active db.",m_bdbPersistenceManager->m_regionName.c_str());
}
} //end if
} //end while
// ARB: must return something for WIN32
return 0;
}
void BDBImpl::cleanupRegionData()
{
if (!m_isEnvReady)
return;
if (!m_initFlag)
return;
//ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_activeDBMutex);
// ARB: file manager thread should deactivate gracefully.
//LOGINFO("ARB: in cleanupRegionData().. about to set m_isRunning to false..:%s",m_regionName.c_str());
if (m_dbFileManager) {
m_dbFileManager->m_isRunning = false;
//join with thread
m_dbFileManager->wait();
delete m_dbFileManager;
}
// Close all DBs and global db environment. Remove persistence directory.
std::string regionDir = m_PersistenceDirectory + "/" +m_regionName;
for (int dbIdx=0;dbIdx<m_currentDBIndex+1;dbIdx++)
{
void *dbhandle;
try {
dbhandle = m_DBHandleVector.at(dbIdx);
}
catch (...) {
// ARB: invalid vector index
continue;
}
m_DBHelper->closeDB(dbhandle);
char dbFileName[MAXFILENAMESIZE];
sprintf(dbFileName,"file_%d.db",dbIdx);
std::string regionDBName = regionDir + "/" + dbFileName;
LOGINFO("[cleanupRegionData()] Removing region file: %s",regionDBName.c_str());
if (ACE_OS::unlink(regionDBName.c_str())) {
throw ShutdownFailedException("Failed to remove region db file.");
}
}
// ARB: Need to use ACE_OS::rmdir in future
if (bdb_rmdir(regionDir.c_str()))
{
throw ShutdownFailedException("Failed to remove region directory.");
}
m_regionCount--;
if (m_regionCount.value() == 0) {
bdb_rmdir(m_PersistenceDirectory.c_str());
// ACE_stat fileStat;
/* if (!ACE_OS::stat(m_PersistenceDirectory.c_str(), &fileStat))
{
throw ShutdownFailedException("Failed to remove persistence directory.");
}
*/ }
}
void BDBImpl::cleanupPersistenceManager()
{
cleanupRegionData();
ACE_Guard<ACE_Thread_Mutex> guard( m_envReadyLock );
if (m_regionCount.value() == 0) {
m_DBHelper->closeEnvironment();
m_isEnvReady=false;
//Remove persistence directory
//ARB: Cleanup db environment files. Can BDB take care on closing environment? A better way is to delete all files in the environment directory.
ACE_OS::unlink((m_gEnvDirectory+"/__db.001").c_str());
ACE_OS::unlink((m_gEnvDirectory+"/__db.002").c_str());
ACE_OS::unlink((m_gEnvDirectory+"/__db.003").c_str());
ACE_OS::unlink((m_gEnvDirectory+"/__db.004").c_str());
ACE_OS::unlink((m_gEnvDirectory+"/*").c_str());
// ARB: Need to use ACE_OS::rmdir in future
if (bdb_rmdir(m_gEnvDirectory.c_str()))
{
LOGERROR("Failed to remove environment directory %s.", m_gEnvDirectory.c_str());
}
}
delete m_DBHelper;
}
void BDBImpl::convertToDirName(std::string& regionName)
{
for (size_t strIdx=0;strIdx<regionName.length();strIdx++)
{
if (regionName.at(strIdx) == '/') {
regionName.replace(strIdx,1,"_");
}
}
}
// Portable function for directory deletion.
// Need this as ACE5.4 does not have rmdir()
// For ACE bug, please refer to: http://deuce.doc.wustl.edu/bugzilla/show_bug.cgi?id=1409
int bdb_rmdir(const char *directoryPath)
{
return ACE_OS::rmdir( directoryPath );
}
void BDBImpl::close()
{
LOGINFO("Persistence closing.");
if(closeFlag.value()==0) {
closeFlag=1;
cleanupPersistenceManager();
}
}
extern "C" {
LIBEXP PersistenceManager* createBDBInstance() {
return new BDBImpl;
}
}