GEODE-8887: Refactor EventIdTSS class (#733)
- Use Meyers singleton pattern for global singleton (threadId), and
thread local singleton EventIdTSS
- rename all member variables in EventId and EventIdTSS classes
- clean up logic slightly around global threadId atomic var
- rename EventId getter methods for clarity
diff --git a/cppcache/src/EventId.cpp b/cppcache/src/EventId.cpp
index a6db548..62c0539 100644
--- a/cppcache/src/EventId.cpp
+++ b/cppcache/src/EventId.cpp
@@ -17,82 +17,97 @@
#include "EventId.hpp"
#include <atomic>
+#include <cstdint>
#include <cstring>
#include <geode/DataInput.hpp>
#include "ClientProxyMembershipID.hpp"
+#include "util/Log.hpp"
namespace apache {
namespace geode {
namespace client {
-class EventIdTSS {
- private:
- static std::atomic<int64_t> s_eidThrId;
-
- int64_t m_eidThrTSS;
- int64_t m_eidSeqTSS;
-
- ~EventIdTSS() = default;
- EventIdTSS(const EventIdTSS&) = delete;
- EventIdTSS& operator=(const EventIdTSS&) = delete;
-
+class ThreadIdCounter {
public:
- // this should get called just once per thread due to first access to TSS
- EventIdTSS() {
- m_eidThrTSS = ++s_eidThrId;
- m_eidSeqTSS = 0;
+ static std::atomic<int64_t>& instance() {
+ static std::atomic<int64_t> threadId_(0);
+ return threadId_;
}
- inline int64_t getEidThr() { return m_eidThrTSS; }
+ static int64_t next() { return ++instance(); }
+};
- inline int64_t getAndIncEidSeq() { return m_eidSeqTSS++; }
+class EventIdTSS {
+ public:
+ static EventIdTSS& instance() {
+ thread_local EventIdTSS eventId_;
+ return eventId_;
+ }
- inline int64_t getSeqNum() { return m_eidSeqTSS - 1; }
+ int64_t nextSequenceId() {
+ sequenceId_++;
+ return sequenceId_;
+ }
- static thread_local EventIdTSS s_eventId;
+ int64_t currentSequenceId() { return sequenceId_; }
-}; // class EventIdTSS
+ int64_t threadId() { return threadId_; }
-std::atomic<int64_t> EventIdTSS::s_eidThrId;
-thread_local EventIdTSS EventIdTSS::s_eventId;
+ private:
+ EventIdTSS();
+
+ int64_t threadId_;
+ int64_t sequenceId_;
+};
+
+EventIdTSS::EventIdTSS() : threadId_(ThreadIdCounter::next()), sequenceId_(0) {
+ LOGDEBUG("EventIdTSS::EventIdTSS(%p): threadId_=%" PRId64
+ ", sequenceId_=%" PRId64,
+ this, threadId_, sequenceId_);
+}
void EventId::toData(DataOutput& output) const {
// This method is always expected to write out nonstatic distributed
- // memberid.
- output.writeBytes(reinterpret_cast<const int8_t*>(m_eidMem), m_eidMemLen);
+ // memberid. Note that binary representation of EventId is NOT THE
+ // SAME here as when serialized into part of a message (via the writeIdsData
+ // method).
+ LOGDEBUG("EventId::toData(%p) - called", this);
+ output.writeBytes(reinterpret_cast<const int8_t*>(clientId_),
+ clientIdLength_);
output.writeArrayLen(18);
char longCode = 3;
output.write(static_cast<uint8_t>(longCode));
- output.writeInt(m_eidThr);
+ output.writeInt(threadId_);
output.write(static_cast<uint8_t>(longCode));
- output.writeInt(m_eidSeq);
- output.writeInt(m_bucketId);
- output.write(m_breadcrumbCounter);
+ output.writeInt(sequenceId_);
+ output.writeInt(bucketId_);
+ output.write(breadcrumbCounter_);
}
void EventId::fromData(DataInput& input) {
- // TODO: statics being assigned; not thread-safe??
- m_eidMemLen = input.readArrayLength();
- input.readBytesOnly(reinterpret_cast<int8_t*>(m_eidMem), m_eidMemLen);
- input.readArrayLength(); // ignore arrayLen
- m_eidThr = getEventIdData(input, input.read());
- m_eidSeq = getEventIdData(input, input.read());
- m_bucketId = input.readInt32();
- m_breadcrumbCounter = input.read();
+ LOGDEBUG("EventId::fromData(%p) - called", this);
+ clientIdLength_ = input.readArrayLength();
+ input.readBytesOnly(reinterpret_cast<int8_t*>(clientId_), clientIdLength_);
+ input.readArrayLength();
+ threadId_ = getEventIdData(input, input.read());
+ sequenceId_ = getEventIdData(input, input.read());
+ bucketId_ = input.readInt32();
+ breadcrumbCounter_ = input.read();
}
-const char* EventId::getMemId() const { return m_eidMem; }
+const char* EventId::clientId() const { return clientId_; }
-int32_t EventId::getMemIdLen() const { return m_eidMemLen; }
+int32_t EventId::clientIdLength() const { return clientIdLength_; }
-int64_t EventId::getThrId() const { return m_eidThr; }
+int64_t EventId::threadId() const { return threadId_; }
-int64_t EventId::getSeqNum() const { return m_eidSeq; }
+int64_t EventId::sequenceNumber() const { return sequenceId_; }
int64_t EventId::getEventIdData(DataInput& input, char numberCode) {
int64_t retVal = 0;
+ LOGDEBUG("EventId::getEventIdData(%p) - called", this);
// Read number based on numeric code written by java server.
if (numberCode == 0) {
@@ -113,19 +128,23 @@
}
std::shared_ptr<Serializable> EventId::createDeserializable() {
- return std::make_shared<EventId>(false);
+ LOGDEBUG("EventId::createDeserializable - called");
// use false since we dont want to inc sequence
// (for de-serialization)
+ return std::make_shared<EventId>(false);
}
EventId::EventId(char* memId, uint32_t memIdLen, int64_t thr, int64_t seq) {
+ LOGDEBUG("EventId::EventId(%p) - memId=%s, memIdLen=%d, thr=%" PRId64
+ ", seq=%" PRId64,
+ this, memId, memIdLen, thr, seq);
// TODO: statics being assigned; not thread-safe??
- std::memcpy(m_eidMem, memId, memIdLen);
- m_eidMemLen = memIdLen;
- m_eidThr = thr;
- m_eidSeq = seq;
- m_bucketId = -1;
- m_breadcrumbCounter = 0;
+ std::memcpy(clientId_, memId, memIdLen);
+ clientIdLength_ = memIdLen;
+ threadId_ = thr;
+ sequenceId_ = seq;
+ bucketId_ = -1;
+ breadcrumbCounter_ = 0;
}
EventId::EventId(bool doInit, uint32_t reserveSize,
@@ -133,11 +152,16 @@
: /* adongre
* CID 28934: Uninitialized scalar field (UNINIT_CTOR)
*/
- m_eidMemLen(0),
- m_eidThr(0),
- m_eidSeq(0),
- m_bucketId(-1),
- m_breadcrumbCounter(0) {
+ clientIdLength_(0),
+ threadId_(0),
+ sequenceId_(0),
+ bucketId_(-1),
+ breadcrumbCounter_(0) {
+ LOGDEBUG(
+ "EventId::EventId(%p) - doInit=%s, reserveSize=%d, "
+ "fullValueAfterDeltaFail=%s",
+ this, doInit ? "true" : "false", reserveSize,
+ fullValueAfterDeltaFail ? "true" : "false");
if (!doInit) return;
if (fullValueAfterDeltaFail) {
@@ -148,18 +172,24 @@
}
for (uint32_t i = 0; i < reserveSize; i++) {
- EventIdTSS::s_eventId.getAndIncEidSeq();
+ EventIdTSS::instance().nextSequenceId();
}
}
void EventId::initFromTSS() {
- m_eidThr = EventIdTSS::s_eventId.getEidThr();
- m_eidSeq = EventIdTSS::s_eventId.getAndIncEidSeq();
+ threadId_ = EventIdTSS::instance().threadId();
+ sequenceId_ = EventIdTSS::instance().nextSequenceId();
+ LOGDEBUG("EventId::initFromTSS(%p) - called, tid=%" PRId64 ", seqid=%" PRId64,
+ this, threadId_, sequenceId_);
}
void EventId::initFromTSS_SameThreadIdAndSameSequenceId() {
- m_eidThr = EventIdTSS::s_eventId.getEidThr();
- m_eidSeq = EventIdTSS::s_eventId.getSeqNum();
+ threadId_ = EventIdTSS::instance().threadId();
+ sequenceId_ = EventIdTSS::instance().currentSequenceId();
+ LOGDEBUG(
+ "EventId::initFromTSS_SameThreadIdAndSameSequenceId(%p) - called, "
+ "tid=%" PRId64 ", seqid=%" PRId64,
+ this, threadId_, sequenceId_);
}
} // namespace client
diff --git a/cppcache/src/EventId.hpp b/cppcache/src/EventId.hpp
index 19828e7..e301e4a 100644
--- a/cppcache/src/EventId.hpp
+++ b/cppcache/src/EventId.hpp
@@ -20,12 +20,16 @@
#ifndef GEODE_EVENTID_H_
#define GEODE_EVENTID_H_
+#include <inttypes.h>
+
#include <string>
#include <geode/DataOutput.hpp>
#include <geode/internal/DataSerializableFixedId.hpp>
#include <geode/internal/geode_globals.hpp>
+#include "util/Log.hpp"
+
/** @file
*/
@@ -42,21 +46,21 @@
class APACHE_GEODE_EXPORT EventId
: public internal::DataSerializableFixedId_t<DSFid::EventId> {
private:
- char m_eidMem[512];
- int32_t m_eidMemLen;
- int64_t m_eidThr;
- int64_t m_eidSeq;
- int32_t m_bucketId;
- int8_t m_breadcrumbCounter;
+ char clientId_[512];
+ int32_t clientIdLength_;
+ int64_t threadId_;
+ int64_t sequenceId_;
+ int32_t bucketId_;
+ int8_t breadcrumbCounter_;
public:
/**
*@brief Accessor methods
**/
- const char* getMemId() const;
- int32_t getMemIdLen() const;
- int64_t getThrId() const;
- int64_t getSeqNum() const;
+ const char* clientId() const;
+ int32_t clientIdLength() const;
+ int64_t threadId() const;
+ int64_t sequenceNumber() const;
void toData(DataOutput& output) const override;
@@ -96,9 +100,11 @@
output.write(static_cast<uint8_t>(0));
char longCode = 3;
output.write(static_cast<uint8_t>(longCode));
- output.writeInt(m_eidThr);
+ output.writeInt(threadId_);
output.write(static_cast<uint8_t>(longCode));
- output.writeInt(m_eidSeq);
+ output.writeInt(sequenceId_);
+ LOGDEBUG("%s(%p): Wrote tid=%" PRId64 ", seqid=%" PRId64, __FUNCTION__,
+ this, threadId_, sequenceId_);
}
/** Constructor, given the values. */
diff --git a/cppcache/src/EventIdMap.cpp b/cppcache/src/EventIdMap.cpp
index 6fe069e..ec83b84 100644
--- a/cppcache/src/EventIdMap.cpp
+++ b/cppcache/src/EventIdMap.cpp
@@ -35,8 +35,8 @@
EventIdMapEntry EventIdMap::make(std::shared_ptr<EventId> eventid) {
auto sid = std::make_shared<EventSource>(
- eventid->getMemId(), eventid->getMemIdLen(), eventid->getThrId());
- auto seq = std::make_shared<EventSequence>(eventid->getSeqNum());
+ eventid->clientId(), eventid->clientIdLength(), eventid->threadId());
+ auto seq = std::make_shared<EventSequence>(eventid->sequenceNumber());
return std::make_pair(sid, seq);
}