blob: b77b76ffc87766e1a0b0fb48279422fcbaee80d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "HostStatSampler.hpp"
#include <algorithm>
#include <chrono>
#include <map>
#include <regex>
#include <thread>
#include <utility>
#include <vector>
#include <boost/filesystem.hpp>
#include <boost/process/environment.hpp>
#include <boost/range/adaptors.hpp>
#include <geode/CacheFactory.hpp>
#include <geode/ExceptionTypes.hpp>
#include <geode/internal/geode_globals.hpp>
#include "../AdminRegion.hpp"
#include "../CacheImpl.hpp"
#include "../ClientHealthStats.hpp"
#include "../ClientProxyMembershipID.hpp"
#include "../CppCacheLibrary.hpp"
#include "../DistributedSystem.hpp"
#include "../TcrConnectionManager.hpp"
#include "../util/Log.hpp"
#include "GeodeStatisticsFactory.hpp"
#include "StatArchiveWriter.hpp"
namespace apache {
namespace geode {
namespace statistics {
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
using client::Exception;
constexpr auto GFS_EXTENSION = ".gfs";
constexpr size_t kibibyte = 1024;
constexpr size_t mebibyte = kibibyte * 1024;
constexpr size_t gibibyte = mebibyte * 1024;
constexpr size_t MAX_STATS_FILE_LIMIT = 1 * gibibyte;
HostStatSampler::HostStatSampler(boost::filesystem::path filePath,
std::chrono::milliseconds sampleRate,
StatisticsManager* statMngr, CacheImpl* cache,
size_t statFileLimit,
size_t statDiskSpaceLimit)
: HostStatSampler(std::move(filePath), sampleRate, statFileLimit,
statDiskSpaceLimit) {
m_cache = cache;
m_samplerStats = std::unique_ptr<StatSamplerStats>(
new StatSamplerStats(statMngr->getStatisticsFactory()));
m_statMngr = statMngr;
initStatDiskSpaceEnabled();
}
HostStatSampler::HostStatSampler(boost::filesystem::path filePath,
std::chrono::milliseconds sampleRate,
size_t statFileLimit,
size_t statDiskSpaceLimit)
: m_adminError(false),
m_running(false),
m_stopRequested(false),
m_isStatDiskSpaceEnabled(statDiskSpaceLimit != 0),
m_archiveFileName(std::move(filePath)),
m_archiveFileSizeLimit(
std::min(statFileLimit * mebibyte, MAX_STATS_FILE_LIMIT)),
m_archiveDiskSpaceLimit(statDiskSpaceLimit * mebibyte),
m_spaceUsed(0),
m_sampleRate(sampleRate),
m_pid(boost::this_process::get_id()),
m_startTime(system_clock::now()),
m_rollIndex(0) {
if (m_isStatDiskSpaceEnabled) {
if (m_archiveFileSizeLimit == 0 ||
m_archiveFileSizeLimit > m_archiveDiskSpaceLimit) {
m_archiveFileSizeLimit = m_archiveDiskSpaceLimit;
}
}
}
void HostStatSampler::initStatDiskSpaceEnabled() {
if (m_isStatDiskSpaceEnabled) {
initStatFileWithExt();
initRollIndex();
auto exists = boost::filesystem::exists(m_archiveFileName);
if (exists && m_archiveFileSizeLimit > 0) {
changeArchive(m_archiveFileName);
} else {
writeGfs();
}
}
}
void HostStatSampler::initRollIndex() {
forEachIndexStatFile(
[&](const int32_t index, const boost::filesystem::path&) {
m_rollIndex = std::max(m_rollIndex, index + 1);
});
}
boost::filesystem::path HostStatSampler::initStatFileWithExt() {
return chkForGFSExt(createArchiveFilename());
}
HostStatSampler::~HostStatSampler() noexcept = default;
const boost::filesystem::path& HostStatSampler::createArchiveFilename() {
if (!m_isStatDiskSpaceEnabled) {
const auto pid = std::to_string(boost::this_process::get_id());
if (!m_archiveFileName.has_extension()) {
m_archiveFileName += "-" + pid;
} else {
m_archiveFileName = m_archiveFileName.parent_path() /
m_archiveFileName.stem() += "-" + pid;
}
m_archiveFileName += GFS_EXTENSION;
}
return m_archiveFileName;
}
const boost::filesystem::path& HostStatSampler::getArchiveFilename() const {
return m_archiveFileName;
}
size_t HostStatSampler::getArchiveFileSizeLimit() const {
return m_archiveFileSizeLimit;
}
size_t HostStatSampler::getArchiveDiskSpaceLimit() const {
return m_archiveDiskSpaceLimit;
}
std::chrono::milliseconds HostStatSampler::getSampleRate() const {
return m_sampleRate;
}
void HostStatSampler::accountForTimeSpentWorking(int64_t nanosSpentWorking) {
m_samplerStats->tookSample(nanosSpentWorking);
}
std::recursive_mutex& HostStatSampler::getStatListMutex() {
return m_statMngr->getListMutex();
}
std::vector<Statistics*>& HostStatSampler::getStatistics() {
return m_statMngr->getStatsList();
}
std::vector<Statistics*>& HostStatSampler::getNewStatistics() {
return m_statMngr->getNewlyAddedStatsList();
}
int64_t HostStatSampler::getSystemId() { return m_pid; }
system_clock::time_point HostStatSampler::getSystemStartTime() {
return m_startTime;
}
const std::string& HostStatSampler::getSystemDirectoryPath() {
return client::CppCacheLibrary::getProductDir();
}
const std::string& HostStatSampler::getProductDescription() const {
return client::CacheFactory::getProductDescription();
}
void HostStatSampler::changeArchive(boost::filesystem::path filename) {
if (filename.empty()) {
// terminate the sampling thread
m_stopRequested = true;
return;
}
filename = chkForGFSExt(filename);
if (m_archiver) {
m_archiver->closeFile();
}
rollArchive(filename);
m_archiver.reset(new StatArchiveWriter(filename.string(), this, m_cache));
}
boost::filesystem::path HostStatSampler::chkForGFSExt(
const boost::filesystem::path& filename) const {
if (filename.extension() == GFS_EXTENSION) {
return filename;
}
auto tmp = filename;
if (m_isStatDiskSpaceEnabled) {
return tmp += GFS_EXTENSION;
}
return tmp.replace_extension(GFS_EXTENSION);
}
void HostStatSampler::rollArchive(const boost::filesystem::path& filename) {
if (!boost::filesystem::exists(filename) ||
boost::filesystem::is_empty(filename)) {
return;
}
const auto extension = filename.extension();
if (extension.empty()) {
throw client::IllegalArgumentException("Missing extension.");
}
while (true) {
auto newFilename = filename.parent_path() / filename.stem();
newFilename += "-";
newFilename += std::to_string(m_rollIndex++);
newFilename += extension;
if (boost::filesystem::exists(newFilename)) {
continue;
}
boost::filesystem::rename(filename, newFilename);
break;
}
}
void HostStatSampler::start() {
if (!m_running.exchange(true)) {
m_thread = std::thread(&HostStatSampler::svc, this);
}
}
void HostStatSampler::stop() {
m_stopRequested = true;
m_thread.join();
}
bool HostStatSampler::isRunning() const { return m_running; }
void HostStatSampler::putStatsInAdminRegion() {
try {
// Get Values of gets, puts,misses,listCalls,numThreads
static bool initDone = false;
static std::string clientId = "";
auto adminRgn = m_statMngr->getAdminRegion();
if (adminRgn == nullptr) return;
auto conn_man = adminRgn->getConnectionManager();
if (conn_man->isNetDown()) {
return;
}
client::TryReadGuard _guard(adminRgn->getRWLock(), adminRgn->isDestroyed());
if (!adminRgn->isDestroyed()) {
if (conn_man->getNumEndPoints() > 0) {
if (!initDone) {
adminRgn->init();
initDone = true;
}
int puts = 0, gets = 0, misses = 0, numListeners = 0, numThreads = 0,
creates = 0;
int64_t cpuTime = 0;
auto gf = m_statMngr->getStatisticsFactory();
if (gf) {
const auto cacheStatType = gf->findType("CachePerfStats");
if (cacheStatType) {
Statistics* cachePerfStats =
gf->findFirstStatisticsByType(cacheStatType);
if (cachePerfStats) {
puts = cachePerfStats->getInt("puts");
gets = cachePerfStats->getInt("gets");
misses = cachePerfStats->getInt("misses");
creates = cachePerfStats->getInt("creates");
numListeners =
cachePerfStats->getInt("cacheListenerCallsCompleted");
puts += creates;
}
}
}
static auto numCPU = std::thread::hardware_concurrency();
auto obj = client::ClientHealthStats::create(
gets, puts, misses, numListeners, numThreads, cpuTime, numCPU);
if (clientId.empty()) {
auto memId = conn_man->getCacheImpl()
->getClientProxyMembershipIDFactory()
.create(m_durableClientId, m_durableTimeout);
clientId = memId->getDSMemberIdForThinClientUse();
}
auto keyPtr = client::CacheableString::create(clientId.c_str());
adminRgn->put(keyPtr, obj);
}
}
} catch (const Exception&) {
m_adminError = true;
}
}
void HostStatSampler::writeGfs() {
const auto& archiveFilename = createArchiveFilename();
changeArchive(archiveFilename);
}
void HostStatSampler::forceSample() {
std::lock_guard<decltype(m_samplingLock)> guard(m_samplingLock);
if (m_archiver) {
m_archiver->sample();
m_archiver->flush();
}
}
void HostStatSampler::doSample(const boost::filesystem::path& archiveFilename) {
std::lock_guard<decltype(m_samplingLock)> guard(m_samplingLock);
if (!m_adminError) {
putStatsInAdminRegion();
}
if (m_archiver) {
m_archiver->sample();
if (m_archiveFileSizeLimit != 0) {
auto size = m_archiver->getSampleSize();
auto bytesWritten = m_archiver->bytesWritten();
if (bytesWritten > (m_archiveFileSizeLimit - size)) {
// roll the archive
changeArchive(archiveFilename);
}
}
m_spaceUsed += m_archiver->bytesWritten();
// delete older stat files if disk limit is about to be exceeded.
if ((m_archiveDiskSpaceLimit != 0) &&
(m_spaceUsed >=
(m_archiveDiskSpaceLimit - m_archiver->getSampleSize()))) {
checkDiskLimit();
}
// It will flush the contents to the archive file, in every
// sample run.
m_archiver->flush();
}
}
template <typename _Function>
void HostStatSampler::forEachIndexStatFile(_Function function) const {
const std::regex statsFilter(m_archiveFileName.stem().string() +
R"(-([\d]+))" +
m_archiveFileName.extension().string());
auto dir = m_archiveFileName.parent_path();
if (dir.empty()) {
dir = boost::filesystem::current_path();
}
for (const auto& entry :
boost::make_iterator_range(boost::filesystem::directory_iterator(dir),
{}) |
boost::adaptors::filtered(
static_cast<bool (*)(const boost::filesystem::path&)>(
&boost::filesystem::is_regular_file))) {
std::smatch match;
const auto& file = entry.path();
const auto filename = file.filename();
const auto& filenameStr = filename.string();
if (std::regex_match(filenameStr, match, statsFilter)) {
const auto index = std::stoi(match[1].str());
function(index, file);
}
}
}
void HostStatSampler::checkDiskLimit() {
m_spaceUsed = 0;
std::map<int32_t, std::pair<boost::filesystem::path, size_t>> indexedFiles;
forEachIndexStatFile(
[&](const int32_t index, const boost::filesystem::path& file) {
const auto size = boost::filesystem::file_size(file);
indexedFiles.emplace(index, std::make_pair(file, size));
m_spaceUsed += size;
});
if (m_archiver) {
m_spaceUsed += m_archiver->bytesWritten();
}
for (const auto& i : indexedFiles) {
if (m_spaceUsed > m_archiveDiskSpaceLimit) {
const auto& file = i.second.first;
const auto size = i.second.second;
try {
boost::filesystem::remove(file);
m_spaceUsed -= size;
} catch (boost::filesystem::filesystem_error& e) {
LOGWARN("Could not delete " + file.string() + ": " + e.what());
}
}
}
}
void HostStatSampler::svc(void) {
client::DistributedSystemImpl::setThreadName("NC HSS Thread");
try {
// createArchiveFileName instead of getArchiveFileName here because
// for the first time the sampler needs to add the pid to the filename
// passed to it.
auto archiveFilename = createArchiveFilename();
if (!m_isStatDiskSpaceEnabled) {
changeArchive(archiveFilename);
}
auto samplingRate = milliseconds(getSampleRate());
bool gotexception = false;
int waitTime = 0;
while (!m_stopRequested) {
try {
if (gotexception) {
std::this_thread::sleep_for(std::chrono::seconds(1));
waitTime++;
if (waitTime < 60) { // sleep for minute and then try to recreate
continue;
}
waitTime = 0;
gotexception = false;
changeArchive(archiveFilename);
}
auto sampleStart = high_resolution_clock::now();
doSample(archiveFilename);
nanoseconds spentWorking = high_resolution_clock::now() - sampleStart;
// updating the sampler statistics
accountForTimeSpentWorking(spentWorking.count());
// TODO: replace with condition on m_stopRequested
auto sleepDuration =
samplingRate - duration_cast<milliseconds>(spentWorking);
static const auto wakeInterval = milliseconds(100);
while (!m_stopRequested && sleepDuration > milliseconds::zero()) {
std::this_thread::sleep_for(
sleepDuration > wakeInterval ? wakeInterval : sleepDuration);
sleepDuration -= wakeInterval;
}
} catch (Exception& e) {
// log the exception and let the thread exit.
LOGERROR("Exception in statistics sampler thread: %s: %s",
e.getName().c_str(), e.what());
// now close current archiver and see if we can start new one
gotexception = true;
} catch (...) {
LOGERROR("Unknown Exception in statistics sampler thread: ");
gotexception = true;
}
}
m_samplerStats->close();
if (m_archiver != nullptr) {
m_archiver->close();
}
} catch (Exception& e) {
// log the exception and let the thread exit.
LOGERROR("Exception in statistics sampler thread: %s: %s",
e.getName().c_str(), e.what());
} /* catch (...) {
// log the exception and let the thread exit.
LOGERROR("Exception in sampler thread ");
closeSpecialStats();
}*/
m_running = false;
}
} // namespace statistics
} // namespace geode
} // namespace apache