blob: d3362e737d633a341417dda0deaace1b5cfe7dbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StatArchiveWriter.hpp"
#include <chrono>
#include <ctime>
#include <ace/ACE.h>
#include <ace/OS_NS_sys_time.h>
#include <ace/OS_NS_sys_utsname.h>
#include <ace/OS_NS_time.h>
#include <ace/Task.h>
#include <ace/Thread_Mutex.h>
#include <geode/internal/geode_globals.hpp>
#include "../CacheImpl.hpp"
#include "../util/chrono/time_point.hpp"
#include "GeodeStatisticsFactory.hpp"
#include "HostStatSampler.hpp"
namespace apache {
namespace geode {
namespace statistics {
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::nanoseconds;
using std::chrono::steady_clock;
using std::chrono::system_clock;
using client::GeodeIOException;
using client::IllegalArgumentException;
using client::NullPointerException;
// Constructor and Member functions of StatDataOutput class
StatDataOutput::StatDataOutput(CacheImpl *cacheImpl)
: bytesWritten(0), m_fp(nullptr), closed(false) {
dataBuffer = std::unique_ptr<DataOutput>(
new DataOutput(cacheImpl->createDataOutput()));
}
StatDataOutput::StatDataOutput(std::string filename, CacheImpl *cacheImpl) {
if (filename.length() == 0) {
throw IllegalArgumentException("undefined archive file name");
}
dataBuffer = std::unique_ptr<DataOutput>(
new DataOutput(cacheImpl->createDataOutput()));
outFile = filename;
closed = false;
bytesWritten = 0;
m_fp = fopen(outFile.c_str(), "a+b");
if (m_fp == nullptr) {
throw NullPointerException("error in opening archive file for writing");
}
}
StatDataOutput::~StatDataOutput() {
if (!closed && m_fp != nullptr) {
fclose(m_fp);
}
}
int64_t StatDataOutput::getBytesWritten() { return this->bytesWritten; }
void StatDataOutput::flush() {
const uint8_t *buffBegin = dataBuffer->getBuffer();
if (buffBegin == nullptr) {
throw NullPointerException("undefined stat data buffer beginning");
}
const uint8_t *buffEnd = dataBuffer->getCursor();
if (buffEnd == nullptr) {
throw NullPointerException("undefined stat data buffer end");
}
int32_t sizeOfUInt8 = sizeof(uint8_t);
int32_t len = static_cast<int32_t>(buffEnd - buffBegin);
if (len > 0) {
if (fwrite(buffBegin, sizeOfUInt8, len, m_fp) != static_cast<size_t>(len)) {
LOGERROR("Could not write into the statistics file");
throw GeodeIOException("Could not write into the statistics file");
}
}
int rVal = fflush(m_fp);
if (rVal != 0) {
LOGERROR("Could not flush into the statistics file");
throw GeodeIOException("Could not flush into the statistics file");
}
}
void StatDataOutput::resetBuffer() {
dataBuffer->reset();
bytesWritten = 0;
}
void StatDataOutput::writeByte(int8_t v) {
dataBuffer->write(v);
bytesWritten += 1;
}
void StatDataOutput::writeBoolean(int8_t v) { writeByte(v); }
void StatDataOutput::writeShort(int16_t v) {
dataBuffer->writeInt(v);
bytesWritten += 2;
}
void StatDataOutput::writeInt(int32_t v) {
dataBuffer->writeInt(v);
bytesWritten += 4;
}
void StatDataOutput::writeLong(int64_t v) {
dataBuffer->writeInt(v);
bytesWritten += 8;
}
void StatDataOutput::writeUTF(std::string s) {
size_t len = s.length();
dataBuffer->writeUTF(s);
bytesWritten += len;
}
void StatDataOutput::close() {
fclose(m_fp);
m_fp = nullptr;
closed = true;
}
void StatDataOutput::openFile(std::string filename, int64_t size) {
m_fp = fopen(filename.c_str(), "a+b");
if (m_fp == nullptr) {
throw NullPointerException("error in opening archive file for writing");
}
closed = false;
bytesWritten = size;
}
// Constructor and Member functions of ResourceType class
ResourceType::ResourceType(int32_t idArg, const StatisticsType *typeArg) {
this->id = idArg;
this->stats = typeArg->getStatistics();
int32_t desc = typeArg->getDescriptorsCount();
this->numOfDescriptors = desc;
}
int32_t ResourceType::getId() const { return this->id; }
int32_t ResourceType::getNumOfDescriptors() const {
return this->numOfDescriptors;
}
StatisticDescriptor **ResourceType::getStats() const { return this->stats; }
// Constructor and Member functions of ResourceInst class
ResourceInst::ResourceInst(int32_t idArg, Statistics *resourceArg,
const ResourceType *typeArg,
StatDataOutput *dataOutArg)
: type(typeArg) {
id = idArg;
resource = resourceArg;
dataOut = dataOutArg;
int32_t cnt = type->getNumOfDescriptors();
archivedStatValues = new int64_t[cnt];
// initialize to zero
for (int32_t i = 0; i < cnt; i++) {
archivedStatValues[i] = 0;
}
firstTime = true;
}
ResourceInst::~ResourceInst() { delete[] archivedStatValues; }
int32_t ResourceInst::getId() { return this->id; }
Statistics *ResourceInst::getResource() { return this->resource; }
const ResourceType *ResourceInst::getType() const { return this->type; }
int64_t ResourceInst::getStatValue(StatisticDescriptor *f) {
return this->resource->getRawBits(f);
}
void ResourceInst::writeSample() {
bool wroteInstId = false;
bool checkForChange = true;
StatisticDescriptor **stats = type->getStats();
if (resource->isClosed()) {
return;
}
if (firstTime) {
firstTime = false;
checkForChange = false;
}
auto count = type->getNumOfDescriptors();
for (int32_t i = 0; i < count; i++) {
int64_t value = getStatValue(stats[i]);
if (!checkForChange || value != archivedStatValues[i]) {
int64_t delta = value - archivedStatValues[i];
archivedStatValues[i] = value;
if (!wroteInstId) {
wroteInstId = true;
writeResourceInst(dataOut, id);
}
dataOut->writeByte(i);
writeStatValue(stats[i], delta);
}
}
if (wroteInstId) {
dataOut->writeByte(static_cast<unsigned char>(ILLEGAL_STAT_OFFSET));
}
}
void ResourceInst::writeStatValue(StatisticDescriptor *sd, int64_t v) {
auto sdImpl = static_cast<StatisticDescriptorImpl *>(sd);
if (sdImpl == nullptr) {
throw NullPointerException("could not downcast to StatisticDescriptorImpl");
}
FieldType typeCode = sdImpl->getTypeCode();
switch (typeCode) {
case INT_TYPE:
case LONG_TYPE:
// case GF_FIELDTYPE_FLOAT:
case DOUBLE_TYPE:
writeCompactValue(v);
break;
default:
throw IllegalArgumentException("Unexpected type code");
}
}
void ResourceInst::writeCompactValue(int64_t v) {
if (v <= MAX_1BYTE_COMPACT_VALUE && v >= MIN_1BYTE_COMPACT_VALUE) {
this->dataOut->writeByte(static_cast<int8_t>(v));
} else if (v <= MAX_2BYTE_COMPACT_VALUE && v >= MIN_2BYTE_COMPACT_VALUE) {
this->dataOut->writeByte(COMPACT_VALUE_2_TOKEN);
this->dataOut->writeShort(static_cast<int16_t>(v));
} else {
int8_t buffer[8];
int32_t idx = 0;
if (v < 0) {
while (v != -1 && v != 0) {
buffer[idx++] = static_cast<int8_t>(v & 0xFF);
v >>= 8;
}
// On windows v goes to zero somtimes; seems like a bug
if (v == 0) {
// when this happens we end up with a bunch of -1 bytes
// so strip off the high order ones
while (0 < idx && buffer[idx - 1] == -1) {
idx--;
}
}
if (0 < idx && (buffer[idx - 1] & 0x80) == 0) {
/* If the most significant byte does not have its high order bit set
* then add a -1 byte so we know this is a negative number
*/
buffer[idx++] = -1;
}
} else {
while (v != 0) {
buffer[idx++] = static_cast<int8_t>(v & 0xFF);
v >>= 8;
}
if ((buffer[idx - 1] & 0x80) != 0) {
/* If the most significant byte has its high order bit set
* then add a zero byte so we know this is a positive number
*/
buffer[idx++] = 0;
}
}
int8_t token = COMPACT_VALUE_2_TOKEN + (idx - 2);
this->dataOut->writeByte(token);
for (int32_t i = idx - 1; i >= 0; i--) {
this->dataOut->writeByte(buffer[i]);
}
}
}
void ResourceInst::writeResourceInst(StatDataOutput *dataOutArg,
int32_t instId) {
if (instId > MAX_BYTE_RESOURCE_INST_ID) {
if (instId > MAX_SHORT_RESOURCE_INST_ID) {
dataOutArg->writeByte(static_cast<uint8_t>(INT_RESOURCE_INST_ID_TOKEN));
dataOutArg->writeInt(instId);
} else {
dataOutArg->writeByte(static_cast<uint8_t>(SHORT_RESOURCE_INST_ID_TOKEN));
dataOutArg->writeShort(instId);
}
} else {
dataOutArg->writeByte(static_cast<uint8_t>(instId));
}
}
// Constructor and Member functions of StatArchiveWriter class
StatArchiveWriter::StatArchiveWriter(std::string outfile,
HostStatSampler *samplerArg,
CacheImpl *cache)
: cache(cache) {
resourceTypeId = 0;
resourceInstId = 0;
statResourcesModCount = 0;
archiveFile = outfile;
bytesWrittenToFile = 0;
/* adongre
* CID 28982: Uninitialized scalar field (UNINIT_CTOR)
*/
m_samplesize = 0;
dataBuffer = new StatDataOutput(archiveFile, cache);
this->sampler = samplerArg;
// write the time, system property etc.
this->previousTimeStamp = steady_clock::now();
this->dataBuffer->writeByte(HEADER_TOKEN);
this->dataBuffer->writeByte(ARCHIVE_VERSION);
this->dataBuffer->writeLong(
duration_cast<milliseconds>(system_clock::now().time_since_epoch())
.count());
int64_t sysId = sampler->getSystemId();
this->dataBuffer->writeLong(sysId);
this->dataBuffer->writeLong(
duration_cast<milliseconds>(
sampler->getSystemStartTime().time_since_epoch())
.count());
int32_t tzOffset = ACE_OS::timezone();
// offset in milli seconds
tzOffset = tzOffset * -1 * 1000;
this->dataBuffer->writeInt(tzOffset);
auto now = std::chrono::system_clock::now();
auto tm_val = apache::geode::util::chrono::localtime(now);
char buf[512] = {0};
std::strftime(buf, sizeof(buf), "%Z", &tm_val);
std::string tzId(buf);
this->dataBuffer->writeUTF(tzId);
std::string sysDirPath = sampler->getSystemDirectoryPath();
this->dataBuffer->writeUTF(sysDirPath);
std::string prodDesc = sampler->getProductDescription();
this->dataBuffer->writeUTF(prodDesc);
ACE_utsname u;
ACE_OS::uname(&u);
std::string os(u.sysname);
os += " ";
/* This version name returns date of release of the version which
creates confusion about the creation time of the vsd file. Hence
removing it now. Later I'll change it to just show version without
date. For now only name of the OS will be displayed.
*/
// os += u.version;
this->dataBuffer->writeUTF(os);
std::string machineInfo(u.machine);
machineInfo += " ";
machineInfo += u.nodename;
this->dataBuffer->writeUTF(machineInfo);
resampleResources();
}
StatArchiveWriter::~StatArchiveWriter() {
if (dataBuffer != nullptr) {
delete dataBuffer;
dataBuffer = nullptr;
}
for (const auto &p : resourceTypeMap) {
auto rt = p.second;
_GEODE_SAFE_DELETE(rt);
}
}
int64_t StatArchiveWriter::bytesWritten() { return bytesWrittenToFile; }
int64_t StatArchiveWriter::getSampleSize() { return m_samplesize; }
void StatArchiveWriter::sample(const steady_clock::time_point &timeStamp) {
std::lock_guard<decltype(sampler->getStatListMutex())> guard(
sampler->getStatListMutex());
m_samplesize = dataBuffer->getBytesWritten();
sampleResources();
this->dataBuffer->writeByte(SAMPLE_TOKEN);
writeTimeStamp(timeStamp);
std::map<Statistics *, ResourceInst *>::iterator p;
for (p = resourceInstMap.begin(); p != resourceInstMap.end(); p++) {
ResourceInst *ri = (*p).second;
if (!!ri && (*p).first != nullptr) {
ri->writeSample();
}
}
writeResourceInst(this->dataBuffer,
static_cast<int32_t>(ILLEGAL_RESOURCE_INST_ID));
m_samplesize = dataBuffer->getBytesWritten() - m_samplesize;
}
void StatArchiveWriter::sample() { sample(steady_clock::now()); }
void StatArchiveWriter::close() {
sample();
this->dataBuffer->flush();
this->dataBuffer->close();
}
void StatArchiveWriter::closeFile() { this->dataBuffer->close(); }
void StatArchiveWriter::openFile(std::string filename) {
// this->dataBuffer->openFile(filename, m_samplesize);
StatDataOutput *p_dataBuffer = new StatDataOutput(filename, cache);
const uint8_t *buffBegin = dataBuffer->dataBuffer->getBuffer();
if (buffBegin == nullptr) {
throw NullPointerException("undefined stat data buffer beginning");
}
const uint8_t *buffEnd = dataBuffer->dataBuffer->getCursor();
if (buffEnd == nullptr) {
throw NullPointerException("undefined stat data buffer end");
}
int32_t len = static_cast<int32_t>(buffEnd - buffBegin);
for (int pos = 0; pos < len; pos++) {
p_dataBuffer->writeByte(buffBegin[pos]);
}
delete dataBuffer;
dataBuffer = p_dataBuffer;
// sample();
}
void StatArchiveWriter::flush() {
this->dataBuffer->flush();
bytesWrittenToFile += dataBuffer->getBytesWritten();
this->dataBuffer->resetBuffer();
/*
// have to figure out the problem with this code.
delete dataBuffer;
dataBuffer = nullptr;
dataBuffer = new StatDataOutput(archiveFile);
*/
}
void StatArchiveWriter::sampleResources() {
// Allocate ResourceInst for newly added stats ( Locked lists already )
std::vector<Statistics *> &newStatsList = sampler->getNewStatistics();
std::vector<Statistics *>::iterator newlistIter;
for (newlistIter = newStatsList.begin(); newlistIter != newStatsList.end();
++newlistIter) {
if (!resourceInstMapHas(*newlistIter)) {
allocateResourceInst(*newlistIter);
}
}
newStatsList.clear();
// for closed stats, write token and then delete from statlist and
// resourceInstMap.
std::map<Statistics *, ResourceInst *>::iterator mapIter;
std::vector<Statistics *> &statsList = sampler->getStatistics();
std::vector<Statistics *>::iterator statlistIter = statsList.begin();
while (statlistIter != statsList.end()) {
if ((*statlistIter)->isClosed()) {
mapIter = resourceInstMap.find(*statlistIter);
if (mapIter != resourceInstMap.end()) {
// Write delete token to file and delete from map
ResourceInst *rinst = (*mapIter).second;
int32_t id = rinst->getId();
this->dataBuffer->writeByte(RESOURCE_INSTANCE_DELETE_TOKEN);
this->dataBuffer->writeInt(id);
resourceInstMap.erase(mapIter);
delete rinst;
}
// Delete stats object stat list
StatisticsManager::deleteStatistics(*statlistIter);
statsList.erase(statlistIter);
statlistIter = statsList.begin();
} else {
++statlistIter;
}
}
}
void StatArchiveWriter::resampleResources() {
std::lock_guard<decltype(sampler->getStatListMutex())> guard(
sampler->getStatListMutex());
std::vector<Statistics *> &statsList = sampler->getStatistics();
std::vector<Statistics *>::iterator statlistIter = statsList.begin();
while (statlistIter != statsList.end()) {
if (!(*statlistIter)->isClosed()) {
allocateResourceInst(*statlistIter);
}
++statlistIter;
}
}
void StatArchiveWriter::writeTimeStamp(
const steady_clock::time_point &timeStamp) {
auto delta = static_cast<int32_t>(
duration_cast<milliseconds>(timeStamp - this->previousTimeStamp).count());
if (delta > MAX_SHORT_TIMESTAMP) {
dataBuffer->writeShort(static_cast<uint16_t>(INT_TIMESTAMP_TOKEN));
dataBuffer->writeInt(delta);
} else {
dataBuffer->writeShort(static_cast<uint16_t>(delta));
}
this->previousTimeStamp = timeStamp;
}
bool StatArchiveWriter::resourceInstMapHas(Statistics *sp) {
std::map<Statistics *, ResourceInst *>::iterator p;
p = resourceInstMap.find(sp);
if (p != resourceInstMap.end()) {
return true;
} else {
return false;
}
}
void StatArchiveWriter::allocateResourceInst(Statistics *s) {
if (s->isClosed()) return;
const auto type = getResourceType(s);
ResourceInst *ri = new ResourceInst(resourceInstId, s, type, dataBuffer);
if (ri == nullptr) {
throw NullPointerException("could not create new resource instance");
}
resourceInstMap.insert(std::pair<Statistics *, ResourceInst *>(s, ri));
this->dataBuffer->writeByte(RESOURCE_INSTANCE_CREATE_TOKEN);
this->dataBuffer->writeInt(resourceInstId);
this->dataBuffer->writeUTF(s->getTextId());
this->dataBuffer->writeLong(s->getNumericId());
this->dataBuffer->writeInt(type->getId());
resourceInstId++;
}
const ResourceType *StatArchiveWriter::getResourceType(const Statistics *s) {
const auto type = s->getType();
if (type == nullptr) {
throw NullPointerException(
"could not know the type of the statistics object");
}
const ResourceType *rt = nullptr;
const auto p = resourceTypeMap.find(type);
if (p != resourceTypeMap.end()) {
rt = p->second;
} else {
rt = new ResourceType(resourceTypeId, type);
if (type == nullptr) {
throw NullPointerException(
"could not allocate memory for a new resourcetype");
}
resourceTypeMap.emplace(type, rt);
// write the type to the archive
this->dataBuffer->writeByte(RESOURCE_TYPE_TOKEN);
this->dataBuffer->writeInt(resourceTypeId);
this->dataBuffer->writeUTF(type->getName());
this->dataBuffer->writeUTF(type->getDescription());
auto stats = rt->getStats();
auto descCnt = rt->getNumOfDescriptors();
this->dataBuffer->writeShort(static_cast<int16_t>(descCnt));
for (int32_t i = 0; i < descCnt; i++) {
std::string statsName = stats[i]->getName();
this->dataBuffer->writeUTF(statsName);
auto sdImpl = static_cast<StatisticDescriptorImpl *>(stats[i]);
if (sdImpl == nullptr) {
throw NullPointerException(
"could not down cast to StatisticDescriptorImpl");
}
this->dataBuffer->writeByte(static_cast<int8_t>(sdImpl->getTypeCode()));
this->dataBuffer->writeBoolean(stats[i]->isCounter());
this->dataBuffer->writeBoolean(stats[i]->isLargerBetter());
this->dataBuffer->writeUTF(stats[i]->getUnit());
this->dataBuffer->writeUTF(stats[i]->getDescription());
}
// increment resourceTypeId
resourceTypeId++;
}
return rt;
}
void StatArchiveWriter::writeResourceInst(StatDataOutput *dataOut,
int32_t instId) {
if (instId > MAX_BYTE_RESOURCE_INST_ID) {
if (instId > MAX_SHORT_RESOURCE_INST_ID) {
dataOut->writeByte(static_cast<uint8_t>(INT_RESOURCE_INST_ID_TOKEN));
dataOut->writeInt(instId);
} else {
dataOut->writeByte(static_cast<uint8_t>(SHORT_RESOURCE_INST_ID_TOKEN));
dataOut->writeShort(instId);
}
} else {
dataOut->writeByte(static_cast<uint8_t>(instId));
}
}
} // namespace statistics
} // namespace geode
} // namespace apache