blob: 4e2836f5de9159d1b8f9de173a3413ba9d488aa9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <mutex>
#include <unordered_map>
#include "common/status.h"
#include "gen-cpp/Types_types.h"
#include "util/aligned-new.h"
#include "util/spinlock.h"
#include "util/unique-id-hash.h"
namespace impala {
/// This is a template that can be used for any map that maps from a query ID (TUniqueId
/// or UniqueIdPB) to some object, and that needs to be sharded. It provides a SpinLock
/// per shard to synchronize access to each shard of the map. The underlying shard is
/// locked and accessed by instantiating a GenericScopedShardedMapRef.
//
/// Usage pattern:
//
/// typedef ShardedQueryMap<QueryState*> QueryStateMap;
/// QueryStateMap qs_map_;
//
template <typename K, typename V>
class GenericShardedQueryMap {
public:
// This function takes a lambda which should take a parameter of object 'T' and
// runs the lambda for all the entries in the map. The lambda should have a return
// type of 'void'..
// TODO: If necessary, refactor the lambda signature to allow returning Status objects.
void DoFuncForAllEntries(const std::function<void(const V&)>& call) {
for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
std::lock_guard<SpinLock> l(shards_[i].map_lock_);
for (const auto& map_value_ref: shards_[i].map_) {
call(map_value_ref.second);
}
}
}
// Adds ('key', 'value') to the map, returning an error if 'key' already exists.
Status Add(const K& key, const V& value);
// Returns the value associated with 'key' in 'value', returning an error if 'key'
// doesn't exist.
Status Get(const K& key, V* value);
// Removes 'key' from the map, returning an error if 'key' doesn't exist.
Status Delete(const K& key);
private:
template <typename K2, typename V2>
friend class GenericScopedShardedMapRef;
// Number of buckets to split the containers of query IDs into.
static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
// We group the map and its corresponding lock together to avoid false sharing. Since
// we will always access a map and its corresponding lock together, it's better if
// they can be allocated on the same cache line.
struct MapShard : public CacheLineAligned {
std::unordered_map<K, V> map_;
SpinLock map_lock_;
};
struct MapShard shards_[NUM_QUERY_BUCKETS];
};
template <typename T>
class ShardedQueryMap : public GenericShardedQueryMap<TUniqueId, T> {};
template <typename T>
class ShardedQueryPBMap : public GenericShardedQueryMap<UniqueIdPB, T> {};
/// Use this class to obtain a locked reference to the underlying map shard
/// of a GenericShardedQueryMap, corresponding to the 'query_id'.
//
/// Pattern:
/// {
/// ScopedShardedMapRef map_ref(qid, sharded_map);
/// DCHECK(map_ref != nullptr); <nullptr should never be returned>
/// ...
/// }
//
/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
/// than the lifetime of this scoped class.
template <typename K, typename V>
class GenericScopedShardedMapRef {
public:
// Finds the appropriate map that could/should contain 'query_id' and locks it.
GenericScopedShardedMapRef(
const K& query_id, class GenericShardedQueryMap<K, V>* sharded_map) {
DCHECK(sharded_map != nullptr);
int qs_map_bucket = QueryIdToBucket(query_id);
shard_ = &sharded_map->shards_[qs_map_bucket];
// Lock the corresponding shard.
shard_->map_lock_.lock();
}
~GenericScopedShardedMapRef() {
shard_->map_lock_.DCheckLocked();
shard_->map_lock_.unlock();
}
// Returns the shard (map) for the 'query_id' passed to the constructor.
// Should never return nullptr.
std::unordered_map<K, V>* get() {
shard_->map_lock_.DCheckLocked();
return &shard_->map_;
}
std::unordered_map<K, V>* operator->() {
shard_->map_lock_.DCheckLocked();
return get();
}
private:
// Return the correct bucket that a query ID would belong to.
inline int QueryIdToBucket(const TUniqueId& query_id) {
int bucket = static_cast<int>(query_id.hi) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
return bucket;
}
inline int QueryIdToBucket(const UniqueIdPB& query_id) {
int bucket = static_cast<int>(query_id.hi()) % ShardedQueryMap<V>::NUM_QUERY_BUCKETS;
DCHECK(bucket < ShardedQueryMap<V>::NUM_QUERY_BUCKETS && bucket >= 0);
return bucket;
}
typename GenericShardedQueryMap<K, V>::MapShard* shard_;
DISALLOW_COPY_AND_ASSIGN(GenericScopedShardedMapRef);
};
template <typename T>
class ScopedShardedMapRef : public GenericScopedShardedMapRef<TUniqueId, T> {
public:
ScopedShardedMapRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
: GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
};
template <typename T>
class ScopedShardedMapPBRef : public GenericScopedShardedMapRef<UniqueIdPB, T> {
public:
ScopedShardedMapPBRef(const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map)
: GenericScopedShardedMapRef<TUniqueId, T>(query_id, sharded_map) {}
};
} // namespace impala