blob: 59dc2d53b8a9f43f3501be7f355fc20ae606ee41 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef SHARDED_QUERY_MAP_UTIL_H
#define SHARDED_QUERY_MAP_UTIL_H
#include <boost/thread/lock_guard.hpp>
#include <unordered_map>
#include "gen-cpp/Types_types.h"
#include "util/aligned-new.h"
#include "util/spinlock.h"
#include "util/uid-util.h"
namespace impala {
/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
/// synchronize access to each shard of the map. The underlying shard is locked and
/// accessed by instantiating a ScopedShardedMapRef.
//
/// Usage pattern:
//
/// typedef ShardedQueryMap<QueryState*> QueryStateMap;
/// QueryStateMap qs_map_;
//
template<typename T>
class ShardedQueryMap {
public:
// This function takes a lambda which should take a parameter of object 'T' and
// runs the lambda for all the entries in the map. The lambda should have a return
// type of 'void'..
// TODO: If necessary, refactor the lambda signature to allow returning Status objects.
void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
boost::lock_guard<SpinLock> l(shards_[i].map_lock_);
for (const auto& map_value_ref: shards_[i].map_) {
call(map_value_ref.second);
}
}
}
private:
template <typename T2>
friend class ScopedShardedMapRef;
// Number of buckets to split the containers of query IDs into.
static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
// We group the map and its corresponding lock together to avoid false sharing. Since
// we will always access a map and its corresponding lock together, it's better if
// they can be allocated on the same cache line.
struct MapShard : public CacheLineAligned {
std::unordered_map<TUniqueId, T> map_;
SpinLock map_lock_;
};
struct MapShard shards_[NUM_QUERY_BUCKETS];
};
/// Use this class to obtain a locked reference to the underlying map shard
/// of a ShardedQueryMap, corresponding to the 'query_id'.
//
/// Pattern:
/// {
/// ScopedShardedMapRef map_ref(qid, sharded_map);
/// DCHECK(map_ref != nullptr); <nullptr should never be returned>
/// ...
/// }
//
/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
/// than the lifetime of this scoped class.
template <typename T>
class ScopedShardedMapRef {
public:
// Finds the appropriate map that could/should contain 'query_id' and locks it.
ScopedShardedMapRef(
const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
DCHECK(sharded_map != nullptr);
int qs_map_bucket = QueryIdToBucket(query_id);
shard_ = &sharded_map->shards_[qs_map_bucket];
// Lock the corresponding shard.
shard_->map_lock_.lock();
}
~ScopedShardedMapRef() {
shard_->map_lock_.DCheckLocked();
shard_->map_lock_.unlock();
}
// Returns the shard (map) for the 'query_id' passed to the constructor.
// Should never return nullptr.
std::unordered_map<TUniqueId, T>* get() {
shard_->map_lock_.DCheckLocked();
return &shard_->map_;
}
std::unordered_map<TUniqueId, T>* operator->() {
shard_->map_lock_.DCheckLocked();
return get();
}
private:
// Return the correct bucket that a query ID would belong to.
inline int QueryIdToBucket(const TUniqueId& query_id) {
int bucket =
static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
return bucket;
}
typename ShardedQueryMap<T>::MapShard* shard_;
DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
};
} // namespace impala
#endif /* SHARDED_QUERY_MAP_UTIL_H */