blob: a6db548c4bb92cccd902fd93859f685cd6548e45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "EventId.hpp"
#include <atomic>
#include <cstring>
#include <geode/DataInput.hpp>
#include "ClientProxyMembershipID.hpp"
namespace apache {
namespace geode {
namespace client {
class EventIdTSS {
private:
static std::atomic<int64_t> s_eidThrId;
int64_t m_eidThrTSS;
int64_t m_eidSeqTSS;
~EventIdTSS() = default;
EventIdTSS(const EventIdTSS&) = delete;
EventIdTSS& operator=(const EventIdTSS&) = delete;
public:
// this should get called just once per thread due to first access to TSS
EventIdTSS() {
m_eidThrTSS = ++s_eidThrId;
m_eidSeqTSS = 0;
}
inline int64_t getEidThr() { return m_eidThrTSS; }
inline int64_t getAndIncEidSeq() { return m_eidSeqTSS++; }
inline int64_t getSeqNum() { return m_eidSeqTSS - 1; }
static thread_local EventIdTSS s_eventId;
}; // class EventIdTSS
std::atomic<int64_t> EventIdTSS::s_eidThrId;
thread_local EventIdTSS EventIdTSS::s_eventId;
void EventId::toData(DataOutput& output) const {
// This method is always expected to write out nonstatic distributed
// memberid.
output.writeBytes(reinterpret_cast<const int8_t*>(m_eidMem), m_eidMemLen);
output.writeArrayLen(18);
char longCode = 3;
output.write(static_cast<uint8_t>(longCode));
output.writeInt(m_eidThr);
output.write(static_cast<uint8_t>(longCode));
output.writeInt(m_eidSeq);
output.writeInt(m_bucketId);
output.write(m_breadcrumbCounter);
}
void EventId::fromData(DataInput& input) {
// TODO: statics being assigned; not thread-safe??
m_eidMemLen = input.readArrayLength();
input.readBytesOnly(reinterpret_cast<int8_t*>(m_eidMem), m_eidMemLen);
input.readArrayLength(); // ignore arrayLen
m_eidThr = getEventIdData(input, input.read());
m_eidSeq = getEventIdData(input, input.read());
m_bucketId = input.readInt32();
m_breadcrumbCounter = input.read();
}
const char* EventId::getMemId() const { return m_eidMem; }
int32_t EventId::getMemIdLen() const { return m_eidMemLen; }
int64_t EventId::getThrId() const { return m_eidThr; }
int64_t EventId::getSeqNum() const { return m_eidSeq; }
int64_t EventId::getEventIdData(DataInput& input, char numberCode) {
int64_t retVal = 0;
// Read number based on numeric code written by java server.
if (numberCode == 0) {
return input.read();
} else if (numberCode == 1) {
retVal = input.readInt16();
} else if (numberCode == 2) {
int32_t intVal;
intVal = input.readInt32();
retVal = intVal;
} else if (numberCode == 3) {
int64_t longVal;
longVal = input.readInt64();
retVal = longVal;
}
return retVal;
}
std::shared_ptr<Serializable> EventId::createDeserializable() {
return std::make_shared<EventId>(false);
// use false since we dont want to inc sequence
// (for de-serialization)
}
EventId::EventId(char* memId, uint32_t memIdLen, int64_t thr, int64_t seq) {
// TODO: statics being assigned; not thread-safe??
std::memcpy(m_eidMem, memId, memIdLen);
m_eidMemLen = memIdLen;
m_eidThr = thr;
m_eidSeq = seq;
m_bucketId = -1;
m_breadcrumbCounter = 0;
}
EventId::EventId(bool doInit, uint32_t reserveSize,
bool fullValueAfterDeltaFail)
: /* adongre
* CID 28934: Uninitialized scalar field (UNINIT_CTOR)
*/
m_eidMemLen(0),
m_eidThr(0),
m_eidSeq(0),
m_bucketId(-1),
m_breadcrumbCounter(0) {
if (!doInit) return;
if (fullValueAfterDeltaFail) {
/// need to send old sequence id
initFromTSS_SameThreadIdAndSameSequenceId();
} else {
initFromTSS();
}
for (uint32_t i = 0; i < reserveSize; i++) {
EventIdTSS::s_eventId.getAndIncEidSeq();
}
}
void EventId::initFromTSS() {
m_eidThr = EventIdTSS::s_eventId.getEidThr();
m_eidSeq = EventIdTSS::s_eventId.getAndIncEidSeq();
}
void EventId::initFromTSS_SameThreadIdAndSameSequenceId() {
m_eidThr = EventIdTSS::s_eventId.getEidThr();
m_eidSeq = EventIdTSS::s_eventId.getSeqNum();
}
} // namespace client
} // namespace geode
} // namespace apache