| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <limits> |
| #include <memory> |
| #include <thrift/TApplicationException.h> |
| #include <thrift/async/TConcurrentClientSyncInfo.h> |
| #include <thrift/transport/TTransportException.h> |
| |
| namespace apache { namespace thrift { namespace async { |
| |
| using namespace ::apache::thrift::concurrency; |
| |
| TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() : |
| stop_(false), |
| seqidMutex_(), |
| // test rollover all the time |
| nextseqid_((std::numeric_limits<int32_t>::max)()-10), |
| seqidToMonitorMap_(), |
| freeMonitors_(), |
| writeMutex_(), |
| readMutex_(), |
| recvPending_(false), |
| wakeupSomeone_(false), |
| seqidPending_(0), |
| fnamePending_(), |
| mtypePending_(::apache::thrift::protocol::T_CALL) |
| { |
| freeMonitors_.reserve(MONITOR_CACHE_SIZE); |
| } |
| |
| bool TConcurrentClientSyncInfo::getPending( |
| std::string &fname, |
| ::apache::thrift::protocol::TMessageType &mtype, |
| int32_t &rseqid) |
| { |
| if(stop_) |
| throwDeadConnection_(); |
| wakeupSomeone_ = false; |
| if(recvPending_) |
| { |
| recvPending_ = false; |
| rseqid = seqidPending_; |
| fname = fnamePending_; |
| mtype = mtypePending_; |
| return true; |
| } |
| return false; |
| } |
| |
| void TConcurrentClientSyncInfo::updatePending( |
| const std::string &fname, |
| ::apache::thrift::protocol::TMessageType mtype, |
| int32_t rseqid) |
| { |
| recvPending_ = true; |
| seqidPending_ = rseqid; |
| fnamePending_ = fname; |
| mtypePending_ = mtype; |
| MonitorPtr monitor; |
| { |
| Guard seqidGuard(seqidMutex_); |
| auto i = seqidToMonitorMap_.find(rseqid); |
| if(i == seqidToMonitorMap_.end()) |
| throwBadSeqId_(); |
| monitor = i->second; |
| } |
| monitor->notify(); |
| } |
| |
| void TConcurrentClientSyncInfo::waitForWork(int32_t seqid) |
| { |
| MonitorPtr m; |
| { |
| Guard seqidGuard(seqidMutex_); |
| m = seqidToMonitorMap_[seqid]; |
| } |
| while(true) |
| { |
| // be very careful about setting state in this loop that affects waking up. You may exit |
| // this function, attempt to grab some work, and someone else could have beaten you (or not |
| // left) the read mutex, and that will put you right back in this loop, with the mangled |
| // state you left behind. |
| if(stop_) |
| throwDeadConnection_(); |
| if(wakeupSomeone_) |
| return; |
| if(recvPending_ && seqidPending_ == seqid) |
| return; |
| m->waitForever(); |
| } |
| } |
| |
| void TConcurrentClientSyncInfo::throwBadSeqId_() |
| { |
| throw apache::thrift::TApplicationException( |
| TApplicationException::BAD_SEQUENCE_ID, |
| "server sent a bad seqid"); |
| } |
| |
| void TConcurrentClientSyncInfo::throwDeadConnection_() |
| { |
| throw apache::thrift::transport::TTransportException( |
| apache::thrift::transport::TTransportException::NOT_OPEN, |
| "this client died on another thread, and is now in an unusable state"); |
| } |
| |
| void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &) |
| { |
| wakeupSomeone_ = true; |
| if(!seqidToMonitorMap_.empty()) |
| { |
| // The monitor map maps integers to monitors. Larger integers are more recent |
| // messages. Since this is ordered, it means that the last element is the most recent. |
| // We are trying to guess which thread will have its message complete next, so we are picking |
| // the most recent. The oldest message is likely to be some polling, long lived message. |
| // If we guess right, the thread we wake up will handle the message that comes in. |
| // If we guess wrong, the thread we wake up will hand off the work to the correct thread, |
| // costing us an extra context switch. |
| seqidToMonitorMap_.rbegin()->second->notify(); |
| } |
| } |
| |
| void TConcurrentClientSyncInfo::markBad_(const Guard &) |
| { |
| wakeupSomeone_ = true; |
| stop_ = true; |
| for(auto & i : seqidToMonitorMap_) |
| i.second->notify(); |
| } |
| |
| TConcurrentClientSyncInfo::MonitorPtr |
| TConcurrentClientSyncInfo::newMonitor_(const Guard &) |
| { |
| if(freeMonitors_.empty()) |
| return std::make_shared<Monitor>(&readMutex_); |
| MonitorPtr retval; |
| //swapping to avoid an atomic operation |
| retval.swap(freeMonitors_.back()); |
| freeMonitors_.pop_back(); |
| return retval; |
| } |
| |
| void TConcurrentClientSyncInfo::deleteMonitor_( |
| const Guard &, |
| TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/ |
| { |
| if(freeMonitors_.size() > MONITOR_CACHE_SIZE) |
| { |
| m.reset(); |
| return; |
| } |
| //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor, |
| //so this shouldn't throw |
| freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr()); |
| //swapping to avoid an atomic operation |
| m.swap(freeMonitors_.back()); |
| } |
| |
| int32_t TConcurrentClientSyncInfo::generateSeqId() |
| { |
| Guard seqidGuard(seqidMutex_); |
| if(stop_) |
| throwDeadConnection_(); |
| |
| if(!seqidToMonitorMap_.empty()) |
| if(nextseqid_ == seqidToMonitorMap_.begin()->first) |
| throw apache::thrift::TApplicationException( |
| TApplicationException::BAD_SEQUENCE_ID, |
| "about to repeat a seqid"); |
| int32_t newSeqId = nextseqid_; |
| if (nextseqid_ == (std::numeric_limits<int32_t>::max)()) |
| nextseqid_ = (std::numeric_limits<int32_t>::min)(); |
| else |
| ++nextseqid_; |
| seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard); |
| return newSeqId; |
| } |
| |
| TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) : |
| sync_(*sync), |
| seqid_(seqid), |
| committed_(false) |
| { |
| sync_.getReadMutex().lock(); |
| } |
| |
| TConcurrentRecvSentry::~TConcurrentRecvSentry() |
| { |
| { |
| Guard seqidGuard(sync_.seqidMutex_); |
| sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]); |
| |
| sync_.seqidToMonitorMap_.erase(seqid_); |
| if(committed_) |
| sync_.wakeupAnyone_(seqidGuard); |
| else |
| sync_.markBad_(seqidGuard); |
| } |
| sync_.getReadMutex().unlock(); |
| } |
| |
| void TConcurrentRecvSentry::commit() |
| { |
| committed_ = true; |
| } |
| |
| TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) : |
| sync_(*sync), |
| committed_(false) |
| { |
| sync_.getWriteMutex().lock(); |
| } |
| |
| TConcurrentSendSentry::~TConcurrentSendSentry() |
| { |
| if(!committed_) |
| { |
| Guard seqidGuard(sync_.seqidMutex_); |
| sync_.markBad_(seqidGuard); |
| } |
| sync_.getWriteMutex().unlock(); |
| } |
| |
| void TConcurrentSendSentry::commit() |
| { |
| committed_ = true; |
| } |
| |
| |
| }}} // apache::thrift::async |