Avoid deadlocks when loading a block or blob.
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index 8932e56..410eeb1 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -355,7 +355,7 @@
   DEBUG_ASSERT(num_slots > 0);
   DEBUG_ASSERT(handle != nullptr);
 
-  handle->block_memory = allocateSlots(num_slots, numa_node, kInvalidBlockId);
+  handle->block_memory = allocateSlots(num_slots, numa_node);
   handle->block_memory_size = num_slots;
 
   return ++block_index_;
@@ -368,7 +368,7 @@
   // already loaded before this function gets called.
   size_t num_slots = file_manager_->numSlots(block);
   DEBUG_ASSERT(num_slots != 0);
-  void* block_buffer = allocateSlots(num_slots, numa_node, block);
+  void* block_buffer = allocateSlots(num_slots, numa_node);
 
   const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
   CHECK(status) << "Failed to read block from persistent storage: " << block;
@@ -388,8 +388,7 @@
 }
 
 void* StorageManager::allocateSlots(const std::size_t num_slots,
-                                    const int numa_node,
-                                    const block_id locked_block_id) {
+                                    const int numa_node) {
 #if defined(QUICKSTEP_HAVE_MMAP_LINUX_HUGETLB)
   static constexpr int kLargePageMmapFlags
       = MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB;
@@ -398,7 +397,7 @@
       = MAP_PRIVATE | MAP_ANONYMOUS | MAP_ALIGNED_SUPER;
 #endif
 
-  makeRoomForBlock(num_slots, locked_block_id);
+  makeRoomForBlock(num_slots);
   void *slots = nullptr;
 
 #if defined(QUICKSTEP_HAVE_MMAP_LINUX_HUGETLB) || defined(QUICKSTEP_HAVE_MMAP_BSD_SUPERPAGE)
@@ -492,6 +491,7 @@
       ret = MutableBlockReference(static_cast<StorageBlock*>(it->second.block), eviction_policy_.get());
     }
   }
+  lock_manager_.release(block);
 
   // Note that there is no way for the block to be evicted between the call to
   // loadBlock and the call to EvictionPolicy::blockReferenced from
@@ -511,21 +511,9 @@
       }
     }
     // No other thread loaded the block before us.
-    // But going forward be careful as there is a potential self-deadlock
-    // situation here -- we are holding an Exclusive lock (io_lock)
-    //   and getting ready to go into the call chain
-    //   "MutableBlockReference"/"loadBlock" -> "loadBlockOrBlob"
-    //       -> "allocateSlots" -> "makeRoomForBlock"
-    //   In "makeRoomForBlock," we will acquire an exclusive lock via the call
-    //   "eviction_lock(*lock_manager_.get(block_index))"
-    //   This situation could lead to a self-deadlock as block_index could
-    //   hash to the same position in the "ShardedLockManager" as "block."
-    //   To deal with this case, we pass the block information for "block"
-    //   though the call chain, and check for a collision in the the
-    //   "ShardedLockManager" in the function "makeRoomForBlock."
-    //   If a collision is detected we avoid a self-deadlock.
     ret = MutableBlockReference(loadBlock(block, relation, numa_node), eviction_policy_.get());
   }
+  lock_manager_.release(block);
 
   return ret;
 }
@@ -542,6 +530,7 @@
       ret = MutableBlobReference(static_cast<StorageBlob*>(it->second.block), eviction_policy_.get());
     }
   }
+  lock_manager_.release(blob);
 
   if (!ret.valid()) {
     SpinSharedMutexExclusiveLock<false> io_lock(*lock_manager_.get(blob));
@@ -562,47 +551,49 @@
     // No other thread loaded the blob before us.
     ret = MutableBlobReference(loadBlob(blob, numa_node), eviction_policy_.get());
   }
+  lock_manager_.release(blob);
 
   return ret;
 }
 
-void StorageManager::makeRoomForBlock(const size_t slots,
-                                      const block_id locked_block_id) {
+void StorageManager::makeRoomForBlock(const size_t slots) {
   while (total_memory_usage_ + slots > max_memory_usage_) {
     block_id block_index;
     EvictionPolicy::Status status = eviction_policy_->chooseBlockToEvict(&block_index);
 
-    if ((status == EvictionPolicy::Status::kOk) &&  // Have a valid "block_index."
-        (locked_block_id != kInvalidBlockId) &&
-        (lock_manager_.get(locked_block_id) == lock_manager_.get(block_index))) {
-      // We have a collision in the shared lock manager, where the caller of
-      // this function has acquired a lock, and we are trying to evict a block
-      // that hashes to the same location. This will cause a self-deadlock.
-
-      // For now simply treat this situation as the case where there is not
-      // enough memory and we temporarily go over the memory limit.
-      // TODO(jmp): find another block to evict, if possible.
-      break;
-    }
-
     if (status == EvictionPolicy::Status::kOk) {
-      SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_index));
+      bool has_collision = false;
+      SpinSharedMutexExclusiveLock<false> eviction_lock(*lock_manager_.get(block_index, &has_collision));
+      if (has_collision) {
+        // We have a collision in the shared lock manager, where the caller of
+        // this function has acquired a lock, and we are trying to evict a block
+        // that hashes to the same location. This will cause a self-deadlock.
+
+        // For now simply treat this situation as the case where there is not
+        // enough memory and we temporarily go over the memory limit.
+        break;
+      }
+
       StorageBlockBase* block;
       {
         SpinSharedMutexSharedLock<false> read_lock(blocks_shared_mutex_);
         if (blocks_.find(block_index) == blocks_.end()) {
           // another thread must have jumped in and evicted it before us
+          lock_manager_.release(block_index);
           continue;
         }
         block = blocks_[block_index].block;
       }
       if (eviction_policy_->getRefCount(block->getID()) > 0) {
         // Someone sneaked in and referenced the block before we could evict it.
+        lock_manager_.release(block_index);
         continue;
       }
       if (saveBlockOrBlob(block->getID())) {
         evictBlockOrBlob(block->getID());
       }  // else : Someone sneaked in and evicted the block before we could.
+
+      lock_manager_.release(block_index);
     } else {
       // If status was not ok, then we must not have been able to evict enough
       // blocks; therefore, we return anyway, temporarily going over the memory
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index 88004b8..805e885 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -341,20 +341,8 @@
   // Allocate a buffer to hold the specified number of slots. The memory
   // returned will be zeroed-out, and mapped using large pages if the system
   // supports it.
-  // Note if the last parameter "locked_block_id" is set to something other than
-  // "kInvalidBlockId," then it means that the caller has acquired
-  // a lock in the sharded lock manager for that block. Thus, if a block needs
-  // to be evicted by the EvictionPolicy in the "makeRoomForBlock" call, and
-  // if the block to be evicted happens to hash to the same entry in the
-  // sharded lock manager, then the Eviction policy needs to pick a different
-  // block for eviction.
-  // The key point is that if "locked_block_id" is not "kInvalidBlockId," then
-  // the caller of allocateSlots, e.g. loadBlock, will have acquired a lock
-  // in the sharded lock manager for the block "locked_block_id."
   void* allocateSlots(const std::size_t num_slots,
-                      const int numa_node,
-                      // const block_id locked_block_id = kInvalidBlockId);
-                      const block_id locked_block_id);
+                      const int numa_node);
 
   // Deallocate a buffer allocated by allocateSlots(). This must be used
   // instead of free(), because the underlying implementation of
@@ -402,14 +390,8 @@
    *        requested size.
    *
    * @param slots Number of slots to make room for.
-   * @param locked_block_id Reference to the block id for which room is being made.
-   *                        The parent has a lock in the sharded lock manager for the
-   *                        "locked_block_id,"  so need to pass this through to the
-   *                        EvictionPolicy to avoid a deadlock if the block that is
-   *                        being cleared out hashes to the same hash entry.
    */
-  void makeRoomForBlock(const size_t slots,
-                        const block_id locked_block_id);
+  void makeRoomForBlock(const size_t slots);
 
   /**
    * @brief Load a block from the persistent storage into memory.
diff --git a/utility/ShardedLockManager.hpp b/utility/ShardedLockManager.hpp
index 0248882..550a7ba 100644
--- a/utility/ShardedLockManager.hpp
+++ b/utility/ShardedLockManager.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
- *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2015-2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <functional>
 
+#include "storage/StorageConstants.hpp"
 #include "threading/SharedMutex.hpp"
 #include "utility/Macros.hpp"
 
@@ -54,14 +55,36 @@
    * @param  key The key to map to a SharedMutex.
    * @return     The corresponding SharedMutex.
    */
-  SharedMutexT *get(const T key) {
-    return &shards[hash_(key) % N];
+  SharedMutexT* get(const T key, bool *has_collision = nullptr) {
+    const std::size_t hashed_value = hash_(key) % N;
+    if (has_collision != nullptr) {
+      SpinSharedMutexSharedLock<false> read_lock(collisions_mutex_);
+      *has_collision =
+          collisions_.find(hashed_value) != collisions_.end();
+      if (*has_collision) {
+        return &dummy_mutex_;
+      }
+    }
+
+    SpinSharedMutexExclusiveLock<false> write_lock(collisions_mutex_);
+    collisions_.insert(hashed_value);
+    return &shards[hashed_value];
+  }
+
+  void release(const T key) {
+    SpinSharedMutexExclusiveLock<false> write_lock(collisions_mutex_);
+    collisions_.erase(hash_(key) % N);
   }
 
  private:
   std::hash<T> hash_;
   std::array<SharedMutexT, N> shards;
 
+  SharedMutexT dummy_mutex_;
+
+  alignas(kCacheLineBytes) mutable SpinSharedMutex<false> collisions_mutex_;
+  std::unordered_set<std::size_t> collisions_;
+
   DISALLOW_COPY_AND_ASSIGN(ShardedLockManager);
 };