blob: 35930576e3611d73714d7b54b586a1c554652f7f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_FUTURE_H_
#define LIB_FUTURE_H_
#include <condition_variable>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
using Lock = std::unique_lock<std::mutex>;
namespace pulsar {
template <typename Result, typename Type>
struct InternalState {
std::mutex mutex;
std::condition_variable condition;
Result result;
Type value;
bool complete;
std::list<typename std::function<void(Result, const Type&)> > listeners;
};
template <typename Result, typename Type>
class Future {
public:
typedef std::function<void(Result, const Type&)> ListenerCallback;
Future& addListener(ListenerCallback callback) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) {
lock.unlock();
callback(state->result, state->value);
} else {
state->listeners.push_back(callback);
}
return *this;
}
Result get(Type& result) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (!state->complete) {
// Wait for result
while (!state->complete) {
state->condition.wait(lock);
}
}
result = state->value;
return state->result;
}
template <typename Duration>
bool get(Result& res, Type& value, Duration d) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (!state->complete) {
// Wait for result
while (!state->complete) {
if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
// Timeout while waiting for the future to complete
return false;
}
}
}
value = state->value;
res = state->result;
return true;
}
private:
typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
Future(InternalStatePtr state) : state_(state) {}
std::shared_ptr<InternalState<Result, Type> > state_;
template <typename U, typename V>
friend class Promise;
};
template <typename Result, typename Type>
class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}
bool setValue(const Type& value) const {
static Result DEFAULT_RESULT;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) {
return false;
}
state->value = value;
state->result = DEFAULT_RESULT;
state->complete = true;
decltype(state->listeners) listeners;
listeners.swap(state->listeners);
lock.unlock();
for (auto& callback : listeners) {
callback(DEFAULT_RESULT, value);
}
state->condition.notify_all();
return true;
}
bool setFailed(Result result) const {
static Type DEFAULT_VALUE;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
if (state->complete) {
return false;
}
state->result = result;
state->complete = true;
decltype(state->listeners) listeners;
listeners.swap(state->listeners);
lock.unlock();
for (auto& callback : listeners) {
callback(result, DEFAULT_VALUE);
}
state->condition.notify_all();
return true;
}
bool isComplete() const {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
return state->complete;
}
Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
private:
typedef std::function<void(Result, const Type&)> ListenerCallback;
std::shared_ptr<InternalState<Result, Type> > state_;
};
class Void {};
} /* namespace pulsar */
#endif /* LIB_FUTURE_H_ */