blob: 352c9932d1dc250596d16688d8ac76ceb133c77a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Copyright (c) 2011, François Saint-Jacques
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of the disruptor-- nor the
// names of its contributors may be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef DISRUPTOR_WAITSTRATEGY_H_ // NOLINT
#define DISRUPTOR_WAITSTRATEGY_H_ // NOLINT
#include <boost/chrono.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <vector>
#include "exceptions.h"
#include "interface.h"
#include "sequence.h"
namespace rocketmq {
// Strategy options which are available to those waiting on a
// {@link RingBuffer}
enum WaitStrategyOption {
// This strategy uses a condition variable inside a lock to block the
// event procesor which saves CPU resource at the expense of lock
// contention.
kBlockingStrategy,
// This strategy uses a progressive back off strategy by first spinning,
// then yielding, then sleeping for 1ms period. This is a good strategy
// for burst traffic then quiet periods when latency is not critical.
kSleepingStrategy,
// This strategy calls Thread.yield() in a loop as a waiting strategy
// which reduces contention at the expense of CPU resource.
kYieldingStrategy,
// This strategy call spins in a loop as a waiting strategy which is
// lowest and most consistent latency but ties up a CPU.
kBusySpinStrategy
};
// Blocking strategy that uses a lock and condition variable for
// {@link Consumer}s waiting on a barrier.
// This strategy should be used when performance and low-latency are not as
// important as CPU resource.
class BlockingStrategy : public WaitStrategyInterface {
public:
BlockingStrategy() {}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence) {
int64_t available_sequence = 0;
// We need to wait.
if ((available_sequence = cursor.sequence()) < sequence) {
// acquire lock
boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
while ((available_sequence = cursor.sequence()) < sequence) {
barrier.CheckAlert();
consumer_notify_condition_.wait(ulock);
}
} // unlock happens here, on ulock destruction.
if (0 != dependents.size()) {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
barrier.CheckAlert();
}
}
return available_sequence;
}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence,
const int64_t& timeout_micros) {
int64_t available_sequence = 0;
// We have to wait
if ((available_sequence = cursor.sequence()) < sequence) {
boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
while ((available_sequence = cursor.sequence()) < sequence) {
barrier.CheckAlert();
if (boost::cv_status::timeout ==
consumer_notify_condition_.wait_for(
ulock, boost::chrono::microseconds(timeout_micros)))
break;
}
} // unlock happens here, on ulock destruction
if (0 != dependents.size()) {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
barrier.CheckAlert();
}
}
return available_sequence;
}
virtual void SignalAllWhenBlocking() {
boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
consumer_notify_condition_.notify_all();
}
private:
boost::recursive_mutex mutex_;
boost::condition_variable_any consumer_notify_condition_;
};
// Sleeping strategy
class SleepingStrategy : public WaitStrategyInterface {
public:
SleepingStrategy() {}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence) {
int64_t available_sequence = 0;
int counter = kRetries;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
}
}
return available_sequence;
}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence,
const int64_t& timeout_micros) {
// timing
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t start_micro =
current_date_microseconds.time_of_day().total_milliseconds();
int64_t available_sequence = 0;
int counter = kRetries;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
}
return available_sequence;
}
virtual void SignalAllWhenBlocking() {}
static const int kRetries = 200;
private:
int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
barrier.CheckAlert();
if (counter > 100) {
counter--;
} else if (counter > 0) {
counter--;
boost::this_thread::yield();
} else {
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
}
return counter;
}
};
// Yielding strategy that uses a sleep(0) for {@link EventProcessor}s waiting
// on a barrier. This strategy is a good compromise between performance and
// CPU resource.
class YieldingStrategy : public WaitStrategyInterface {
public:
YieldingStrategy() {}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence) {
int64_t available_sequence = 0;
int counter = kSpinTries;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
}
}
return available_sequence;
}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence,
const int64_t& timeout_micros) {
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t start_micro =
current_date_microseconds.time_of_day().total_milliseconds();
int64_t available_sequence = 0;
int counter = kSpinTries;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
counter = ApplyWaitMethod(barrier, counter);
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
}
return available_sequence;
}
virtual void SignalAllWhenBlocking() {}
static const int kSpinTries = 100;
private:
int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
barrier.CheckAlert();
if (counter == 0) {
boost::this_thread::yield();
} else {
counter--;
}
return counter;
}
};
// Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s
// waiting on a barrier.
// This strategy will use CPU resource to avoid syscalls which can introduce
// latency jitter. It is best used when threads can be bound to specific
// CPU cores.
class BusySpinStrategy : public WaitStrategyInterface {
public:
BusySpinStrategy() {}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence) {
int64_t available_sequence = 0;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
barrier.CheckAlert();
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
barrier.CheckAlert();
}
}
return available_sequence;
}
virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
const Sequence& cursor,
const SequenceBarrierInterface& barrier,
const int64_t& sequence,
const int64_t& timeout_micros) {
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t start_micro =
current_date_microseconds.time_of_day().total_milliseconds();
int64_t available_sequence = 0;
if (0 == dependents.size()) {
while ((available_sequence = cursor.sequence()) < sequence) {
barrier.CheckAlert();
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
} else {
while ((available_sequence = GetMinimumSequence(dependents)) < sequence) {
barrier.CheckAlert();
boost::posix_time::ptime current_date_microseconds =
boost::posix_time::microsec_clock::local_time();
int64_t end_micro =
current_date_microseconds.time_of_day().total_milliseconds();
if (timeout_micros < (end_micro - start_micro)) break;
}
}
return available_sequence;
}
virtual void SignalAllWhenBlocking() {}
};
WaitStrategyInterface* CreateWaitStrategy(WaitStrategyOption wait_option) {
switch (wait_option) {
case kBlockingStrategy:
return new BlockingStrategy();
case kSleepingStrategy:
return new SleepingStrategy();
case kYieldingStrategy:
return new YieldingStrategy();
case kBusySpinStrategy:
return new BusySpinStrategy();
default:
return NULL;
}
}
}; // namespace rocketmq
#endif // DISRUPTOR_WAITSTRATEGY_H_ NOLINT