blob: 62c05397c84c26997a7e32743ca6ea098e2d4bcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "EventId.hpp"
#include <atomic>
#include <cstdint>
#include <cstring>
#include <geode/DataInput.hpp>
#include "ClientProxyMembershipID.hpp"
#include "util/Log.hpp"
namespace apache {
namespace geode {
namespace client {
class ThreadIdCounter {
public:
static std::atomic<int64_t>& instance() {
static std::atomic<int64_t> threadId_(0);
return threadId_;
}
static int64_t next() { return ++instance(); }
};
class EventIdTSS {
public:
static EventIdTSS& instance() {
thread_local EventIdTSS eventId_;
return eventId_;
}
int64_t nextSequenceId() {
sequenceId_++;
return sequenceId_;
}
int64_t currentSequenceId() { return sequenceId_; }
int64_t threadId() { return threadId_; }
private:
EventIdTSS();
int64_t threadId_;
int64_t sequenceId_;
};
EventIdTSS::EventIdTSS() : threadId_(ThreadIdCounter::next()), sequenceId_(0) {
LOGDEBUG("EventIdTSS::EventIdTSS(%p): threadId_=%" PRId64
", sequenceId_=%" PRId64,
this, threadId_, sequenceId_);
}
void EventId::toData(DataOutput& output) const {
// This method is always expected to write out nonstatic distributed
// memberid. Note that binary representation of EventId is NOT THE
// SAME here as when serialized into part of a message (via the writeIdsData
// method).
LOGDEBUG("EventId::toData(%p) - called", this);
output.writeBytes(reinterpret_cast<const int8_t*>(clientId_),
clientIdLength_);
output.writeArrayLen(18);
char longCode = 3;
output.write(static_cast<uint8_t>(longCode));
output.writeInt(threadId_);
output.write(static_cast<uint8_t>(longCode));
output.writeInt(sequenceId_);
output.writeInt(bucketId_);
output.write(breadcrumbCounter_);
}
void EventId::fromData(DataInput& input) {
LOGDEBUG("EventId::fromData(%p) - called", this);
clientIdLength_ = input.readArrayLength();
input.readBytesOnly(reinterpret_cast<int8_t*>(clientId_), clientIdLength_);
input.readArrayLength();
threadId_ = getEventIdData(input, input.read());
sequenceId_ = getEventIdData(input, input.read());
bucketId_ = input.readInt32();
breadcrumbCounter_ = input.read();
}
const char* EventId::clientId() const { return clientId_; }
int32_t EventId::clientIdLength() const { return clientIdLength_; }
int64_t EventId::threadId() const { return threadId_; }
int64_t EventId::sequenceNumber() const { return sequenceId_; }
int64_t EventId::getEventIdData(DataInput& input, char numberCode) {
int64_t retVal = 0;
LOGDEBUG("EventId::getEventIdData(%p) - called", this);
// Read number based on numeric code written by java server.
if (numberCode == 0) {
return input.read();
} else if (numberCode == 1) {
retVal = input.readInt16();
} else if (numberCode == 2) {
int32_t intVal;
intVal = input.readInt32();
retVal = intVal;
} else if (numberCode == 3) {
int64_t longVal;
longVal = input.readInt64();
retVal = longVal;
}
return retVal;
}
std::shared_ptr<Serializable> EventId::createDeserializable() {
LOGDEBUG("EventId::createDeserializable - called");
// use false since we dont want to inc sequence
// (for de-serialization)
return std::make_shared<EventId>(false);
}
EventId::EventId(char* memId, uint32_t memIdLen, int64_t thr, int64_t seq) {
LOGDEBUG("EventId::EventId(%p) - memId=%s, memIdLen=%d, thr=%" PRId64
", seq=%" PRId64,
this, memId, memIdLen, thr, seq);
// TODO: statics being assigned; not thread-safe??
std::memcpy(clientId_, memId, memIdLen);
clientIdLength_ = memIdLen;
threadId_ = thr;
sequenceId_ = seq;
bucketId_ = -1;
breadcrumbCounter_ = 0;
}
EventId::EventId(bool doInit, uint32_t reserveSize,
bool fullValueAfterDeltaFail)
: /* adongre
* CID 28934: Uninitialized scalar field (UNINIT_CTOR)
*/
clientIdLength_(0),
threadId_(0),
sequenceId_(0),
bucketId_(-1),
breadcrumbCounter_(0) {
LOGDEBUG(
"EventId::EventId(%p) - doInit=%s, reserveSize=%d, "
"fullValueAfterDeltaFail=%s",
this, doInit ? "true" : "false", reserveSize,
fullValueAfterDeltaFail ? "true" : "false");
if (!doInit) return;
if (fullValueAfterDeltaFail) {
/// need to send old sequence id
initFromTSS_SameThreadIdAndSameSequenceId();
} else {
initFromTSS();
}
for (uint32_t i = 0; i < reserveSize; i++) {
EventIdTSS::instance().nextSequenceId();
}
}
void EventId::initFromTSS() {
threadId_ = EventIdTSS::instance().threadId();
sequenceId_ = EventIdTSS::instance().nextSequenceId();
LOGDEBUG("EventId::initFromTSS(%p) - called, tid=%" PRId64 ", seqid=%" PRId64,
this, threadId_, sequenceId_);
}
void EventId::initFromTSS_SameThreadIdAndSameSequenceId() {
threadId_ = EventIdTSS::instance().threadId();
sequenceId_ = EventIdTSS::instance().currentSequenceId();
LOGDEBUG(
"EventId::initFromTSS_SameThreadIdAndSameSequenceId(%p) - called, "
"tid=%" PRId64 ", seqid=%" PRId64,
this, threadId_, sequenceId_);
}
} // namespace client
} // namespace geode
} // namespace apache