IMPALA-9274: cyclic barrier implementation

This adds a synchronisation utility CyclicBarrier, which will be
used in IMPALA-9156 for orchestrating the hash join algorithm
with a shared build side and multiple hash join nodes.

Testing:
Add a unit test for the barier class.

Change-Id: Id5c8cc6b3023d2dc03c8116d462440c5e25c3bb0
Reviewed-on: http://gerrit.cloudera.org:8080/14973
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index a90bea8..20018e4 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -42,6 +42,7 @@
   common-metrics.cc
   compress.cc
   cpu-info.cc
+  cyclic-barrier.cc
   decimal-util.cc
   dynamic-util.cc
   debug-util.cc
@@ -109,6 +110,7 @@
   blocking-queue-test.cc
   bloom-filter-test.cc
   coding-util-test.cc
+  cyclic-barrier-test.cc
   debug-util-test.cc
   dict-test.cc
   error-util-test.cc
@@ -167,6 +169,7 @@
 ADD_UNIFIED_BE_LSAN_TEST(blocking-queue-test "BlockingQueueTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(bloom-filter-test "BloomFilter.*:BloomFilterTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(coding-util-test "UrlCodingTest.*:Base64Test.*:HtmlEscapingTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(cyclic-barrier-test "CyclicBarrierTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(debug-util-test "DebugUtil.*")
 # Decompress-test fails in unified mode (possibly due to missing libs)
 ADD_BE_LSAN_TEST(decompress-test)
diff --git a/be/src/util/cyclic-barrier-test.cc b/be/src/util/cyclic-barrier-test.cc
new file mode 100644
index 0000000..6713705
--- /dev/null
+++ b/be/src/util/cyclic-barrier-test.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/thread.hpp>
+
+#include "common/atomic.h"
+#include "testutil/death-test-util.h"
+#include "testutil/gtest-util.h"
+#include "util/cyclic-barrier.h"
+#include "util/time.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+// Basic test implementation that has a group of 'num_threads' threads join the
+// same barrier 'num_iters' times. There is no overlap between the iterations i.e.
+// each group of threads terminates before the next group starts.
+void BasicTest(int num_threads, int num_iters) {
+  CyclicBarrier barrier(num_threads);
+  int counter = 0;
+  for (int i = 0; i < num_iters; ++i) {
+    thread_group threads;
+    for (int j = 0; j < num_threads; ++j) {
+      threads.add_thread(new thread([&]() {
+        // Add some randomness to test so that threads don't always join in predictable
+        // order.
+        SleepForMs(rand() % 5);
+        EXPECT_OK(barrier.Wait([&counter]() { ++counter; }));
+      }));
+    }
+    threads.join_all();
+    // Counter should have been incremented by last arriving threads.
+    EXPECT_EQ(i + 1, counter);
+  }
+}
+
+// Test one iteration of the barrier with varying thread counts.
+TEST(CyclicBarrierTest, BasicOneIter) {
+  BasicTest(1, 1);
+  BasicTest(2, 1);
+  BasicTest(4, 1);
+  BasicTest(50, 1);
+}
+
+// Test many iterations of the barrier.
+TEST(CyclicBarrierTest, BasicManyIters) {
+  BasicTest(8, 2);
+  BasicTest(8, 4);
+  BasicTest(8, 20);
+  BasicTest(8, 100);
+}
+
+// Test where each thread calls Wait() in a tight loop, to exercise cases where calls to
+// Wait() from different cycles can overlap. E.g. if there are two threads and a single
+// processor, the following timing is possible:
+// 1. Thread 1 runs, calls Wait() and blocks.
+// 2. Thread 2 runs, calls Wait(), and notifies thread 1.
+// 3. Thread 2 continues running, returns from Wait(), then calls Wait() again and blocks.
+// 4. Thread 2 wakes up, notices that all threads had joined the barrier, and returns
+//    from Wait().
+// At step 3, thread 1 and 2 are both in Wait() but are in different cycles, hence the
+// overlapping of the cycles.
+void OverlapTest(int num_threads, int num_iters) {
+  CyclicBarrier barrier(num_threads);
+  int counter = 0;
+  thread_group threads;
+  for (int i = 0; i < num_threads; ++i) {
+    threads.add_thread(new thread([&]() {
+      for (int j = 0; j < num_iters; ++j) {
+        EXPECT_OK(barrier.Wait([&counter]() { ++counter; }));
+      }
+    }));
+  }
+  threads.join_all();
+  // Counter should have been incremented by last arriving threads.
+  EXPECT_EQ(num_iters, counter);
+}
+
+// Test many iterations of the barrier.
+TEST(CyclicBarrierTest, Overlap) {
+  OverlapTest(2, 100);
+  OverlapTest(8, 100);
+  OverlapTest(100, 100);
+}
+
+// Test that cancellation functions as expected.
+TEST(CyclicBarrierTest, Cancellation) {
+  const int NUM_THREADS = 8;
+  int counter = 0;
+  AtomicInt32 waits_complete{0};
+  CyclicBarrier barrier(NUM_THREADS);
+  thread_group threads;
+  // All threads except one should join the barrier, then get cancelled.
+  for (int i = 0; i < NUM_THREADS - 1; ++i) {
+    threads.add_thread(new thread([&barrier, &waits_complete, &counter, i]() {
+      // Add some jitter so that not all threads will be waiting when cancelled.
+      if (i % 2 == 0) SleepForMs(rand() % 5);
+      Status status = barrier.Wait([&counter]() { ++counter; });
+      EXPECT_FALSE(status.ok());
+      EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
+      waits_complete.Add(1);
+    }));
+  }
+  SleepForMs(1); // Give other threads a chance to start before cancelling.
+  barrier.Cancel(Status::CANCELLED);
+  threads.join_all();
+  EXPECT_EQ(0, counter) << "The callback should not have run.";
+  EXPECT_EQ(NUM_THREADS - 1, waits_complete.Load()) << "Threads should have returned.";
+
+  // Subsequent calls to Wait() return immediately.
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    Status status = barrier.Wait([&counter]() { ++counter; });
+    EXPECT_FALSE(status.ok());
+    EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
+    EXPECT_EQ(0, counter) << "The callback should not have run.";
+  }
+
+  // First status is not overwritten by later Cancel() calls.
+  barrier.Cancel(Status("Different status"));
+  Status status = barrier.Wait([&counter]() { ++counter; });
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
+
+  // Invalid to cancel with Status::OK();
+  IMPALA_ASSERT_DEBUG_DEATH(barrier.Cancel(Status::OK()), "");
+}
+
+// Passing an empty/null function to Wait() is not supported.
+TEST(CyclicBarrierTest, NullFunction) {
+  CyclicBarrier barrier(1);
+  typedef void (*fn_ptr_t)();
+  IMPALA_ASSERT_DEBUG_DEATH(barrier.Wait(static_cast<fn_ptr_t>(nullptr)), "");
+}
+} // namespace impala
diff --git a/be/src/util/cyclic-barrier.cc b/be/src/util/cyclic-barrier.cc
new file mode 100644
index 0000000..2eb3640
--- /dev/null
+++ b/be/src/util/cyclic-barrier.cc
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/cyclic-barrier.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+CyclicBarrier::CyclicBarrier(int num_threads) : num_threads_(num_threads) {}
+
+void CyclicBarrier::Cancel(const Status& err) {
+  DCHECK(!err.ok());
+  {
+    lock_guard<mutex> l(lock_);
+    if (!cancel_status_.ok()) return; // Already cancelled.
+    cancel_status_ = err;
+  }
+  barrier_cv_.NotifyAll();
+}
+} // namespace impala
diff --git a/be/src/util/cyclic-barrier.h b/be/src/util/cyclic-barrier.h
new file mode 100644
index 0000000..83c0f89
--- /dev/null
+++ b/be/src/util/cyclic-barrier.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/thread.hpp>
+
+#include "common/status.h"
+#include "util/condition-variable.h"
+
+namespace impala {
+
+/// Synchronization barrier that is used when a fixed number of threads need to repeatedly
+/// synchronize, e.g. to proceed to the next phase of an algorithm. At each phase, waits
+/// until all threads have called Wait() on the barrier, then unblocks all threads.
+/// The last thread to arrive at the barrier executes a function before waking the other
+/// threads.
+class CyclicBarrier {
+ public:
+  CyclicBarrier(int num_threads);
+
+  /// Waits until all threads have joined the barrier. Then the last thread executes 'fn'
+  /// and once that is completed, all threads return. Note that 'fn' executes serially,
+  /// so can be used to implement a serial phase of a parallel algorithm.
+  /// Returns OK if all threads joined the barrier or an error status if cancelled.
+  template <typename F>
+  Status Wait(const F& fn) {
+    {
+      boost::unique_lock<boost::mutex> l(lock_);
+      RETURN_IF_ERROR(cancel_status_);
+      ++num_waiting_threads_;
+      DCHECK_LE(num_waiting_threads_, num_threads_);
+      if (num_waiting_threads_ < num_threads_) {
+        // Wait for the last thread to wake us up.
+        int64_t start_cycle = cycle_num_;
+        while (cancel_status_.ok() && cycle_num_ == start_cycle) {
+          barrier_cv_.Wait(l);
+        }
+        return cancel_status_;
+      }
+      // This is the last thread and barrier isn't cancelled. We can proceed by
+      // resetting state for the next cycle.
+      fn();
+      num_waiting_threads_ = 0;
+      ++cycle_num_;
+    }
+    barrier_cv_.NotifyAll();
+    return Status::OK();
+  }
+
+  // Cancels the barrier. All blocked and future calls to cancel will return immediately
+  // with an error status. Cancel() can be called multiple times. In that case, the 'err'
+  // value from the first call is used.
+  // 'err' must be a non-OK status.
+  void Cancel(const Status& err);
+
+ private:
+  // The number of threads participating in synchronization.
+  const int num_threads_;
+
+  // Protects below members.
+  boost::mutex lock_;
+
+  // Condition variable that is signalled (with NotifyAll) when all threads join the
+  // barrier, or the barrier is cancelled.
+  ConditionVariable barrier_cv_;
+
+  // The number of barrier synchronizations that have occurred. Incremented each time that
+  // a barrier synchronization completes. The synchronisation algorithm uses this to
+  // determine whether the previous cycle was complete (calls to Wait() from different
+  // cycles can overlap if a thread calls Wait() for the next cycle before all threads
+  // have returned from Wait() for the previous cycle). The algorithm depends on the cycle
+  // number not being reused until all threads from previous cycles have returned from
+  // Wait().
+  //
+  // Use unsigned integer so that overflow is extremely unlikely, but also has defined
+  // behaviour.
+  uint64_t cycle_num_ = 0;
+
+  // Number of threads that are currently waiting for the barrier.
+  int num_waiting_threads_ = 0;
+
+  // Error status if the barrier was cancelled.
+  Status cancel_status_;
+};
+
+} // namespace impala