Fix for resize concurrency bug

- Introduced a new shared variable that keeps track of the number of
  active writer threads, which write to the hash table.
- A resize request will be processed only when there are no active
  writer threads.
- Used a condition variable to monitor the number of active writer
  threads. When the count is zero, it sends a signal to all the waiting
  threads.
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f05cc46..76daa8e 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -725,6 +725,7 @@
                       quickstep_storage_TupleReference
                       quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
+                      quickstep_threading_ConditionVariable
                       quickstep_threading_SpinSharedMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index f2dcb03..daa036e 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -35,6 +35,7 @@
 #include "storage/TupleReference.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
+#include "threading/ConditionVariable.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -1316,6 +1317,19 @@
                                    const attribute_id key_attr_id,
                                    FunctorT *functor) const;
 
+  void incrementNumActiveWriters() {
+    SpinSharedMutexExclusiveLock<true> lock(resize_shared_mutex_);
+    ++num_active_writers_;
+  }
+
+  void decrementNumActiveWriters() {
+    SpinSharedMutexExclusiveLock<true> lock(resize_shared_mutex_);
+    --num_active_writers_;
+    if (num_active_writers_ == 0) {
+      no_active_writer_condition_->signalAll();
+    }
+  }
+
   // Data structures used for bloom filter optimized semi-joins.
   bool has_build_side_bloom_filter_ = false;
   bool has_probe_side_bloom_filter_ = false;
@@ -1323,6 +1337,13 @@
   std::vector<const BloomFilter*> probe_bloom_filters_;
   std::vector<std::vector<attribute_id>> probe_attribute_ids_;
 
+  // Number of active writers to the hash table.
+  // TODO(harshad) - Can we make this an atomic?
+  std::size_t num_active_writers_;
+
+  alignas(kCacheLineBytes) SpinSharedMutex<true> active_writer_mutex_;
+  ConditionVariable *no_active_writer_condition_;
+
   DISALLOW_COPY_AND_ASSIGN(HashTable);
 };
 
@@ -1588,7 +1609,12 @@
                                                                 &prealloc_state);
           }
           if (!prealloc_succeeded) {
+            while (num_active_writers_ != 0) {
+              no_active_writer_condition_->await();
+            }
             this->resize(total_entries, total_variable_key_size);
+          } else {
+            incrementNumActiveWriters();
           }
         }
       } else {
@@ -1628,6 +1654,7 @@
           accessor->previous();
         }
       }
+      decrementNumActiveWriters();
     } else {
       while (accessor->next()) {
         if (this->GetCompositeKeyFromValueAccessor(*accessor,