blob: 6713705a959a5effa686808626e2df46deaae7f6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/thread.hpp>
#include "common/atomic.h"
#include "testutil/death-test-util.h"
#include "testutil/gtest-util.h"
#include "util/cyclic-barrier.h"
#include "util/time.h"
#include "common/names.h"
namespace impala {
// Basic test implementation that has a group of 'num_threads' threads join the
// same barrier 'num_iters' times. There is no overlap between the iterations i.e.
// each group of threads terminates before the next group starts.
void BasicTest(int num_threads, int num_iters) {
CyclicBarrier barrier(num_threads);
int counter = 0;
for (int i = 0; i < num_iters; ++i) {
thread_group threads;
for (int j = 0; j < num_threads; ++j) {
threads.add_thread(new thread([&]() {
// Add some randomness to test so that threads don't always join in predictable
// order.
SleepForMs(rand() % 5);
EXPECT_OK(barrier.Wait([&counter]() { ++counter; }));
}));
}
threads.join_all();
// Counter should have been incremented by last arriving threads.
EXPECT_EQ(i + 1, counter);
}
}
// Test one iteration of the barrier with varying thread counts.
TEST(CyclicBarrierTest, BasicOneIter) {
BasicTest(1, 1);
BasicTest(2, 1);
BasicTest(4, 1);
BasicTest(50, 1);
}
// Test many iterations of the barrier.
TEST(CyclicBarrierTest, BasicManyIters) {
BasicTest(8, 2);
BasicTest(8, 4);
BasicTest(8, 20);
BasicTest(8, 100);
}
// Test where each thread calls Wait() in a tight loop, to exercise cases where calls to
// Wait() from different cycles can overlap. E.g. if there are two threads and a single
// processor, the following timing is possible:
// 1. Thread 1 runs, calls Wait() and blocks.
// 2. Thread 2 runs, calls Wait(), and notifies thread 1.
// 3. Thread 2 continues running, returns from Wait(), then calls Wait() again and blocks.
// 4. Thread 2 wakes up, notices that all threads had joined the barrier, and returns
// from Wait().
// At step 3, thread 1 and 2 are both in Wait() but are in different cycles, hence the
// overlapping of the cycles.
void OverlapTest(int num_threads, int num_iters) {
CyclicBarrier barrier(num_threads);
int counter = 0;
thread_group threads;
for (int i = 0; i < num_threads; ++i) {
threads.add_thread(new thread([&]() {
for (int j = 0; j < num_iters; ++j) {
EXPECT_OK(barrier.Wait([&counter]() { ++counter; }));
}
}));
}
threads.join_all();
// Counter should have been incremented by last arriving threads.
EXPECT_EQ(num_iters, counter);
}
// Test many iterations of the barrier.
TEST(CyclicBarrierTest, Overlap) {
OverlapTest(2, 100);
OverlapTest(8, 100);
OverlapTest(100, 100);
}
// Test that cancellation functions as expected.
TEST(CyclicBarrierTest, Cancellation) {
const int NUM_THREADS = 8;
int counter = 0;
AtomicInt32 waits_complete{0};
CyclicBarrier barrier(NUM_THREADS);
thread_group threads;
// All threads except one should join the barrier, then get cancelled.
for (int i = 0; i < NUM_THREADS - 1; ++i) {
threads.add_thread(new thread([&barrier, &waits_complete, &counter, i]() {
// Add some jitter so that not all threads will be waiting when cancelled.
if (i % 2 == 0) SleepForMs(rand() % 5);
Status status = barrier.Wait([&counter]() { ++counter; });
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
waits_complete.Add(1);
}));
}
SleepForMs(1); // Give other threads a chance to start before cancelling.
barrier.Cancel(Status::CANCELLED);
threads.join_all();
EXPECT_EQ(0, counter) << "The callback should not have run.";
EXPECT_EQ(NUM_THREADS - 1, waits_complete.Load()) << "Threads should have returned.";
// Subsequent calls to Wait() return immediately.
for (int i = 0; i < NUM_THREADS; ++i) {
Status status = barrier.Wait([&counter]() { ++counter; });
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
EXPECT_EQ(0, counter) << "The callback should not have run.";
}
// First status is not overwritten by later Cancel() calls.
barrier.Cancel(Status("Different status"));
Status status = barrier.Wait([&counter]() { ++counter; });
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::CANCELLED);
// Invalid to cancel with Status::OK();
IMPALA_ASSERT_DEBUG_DEATH(barrier.Cancel(Status::OK()), "");
}
// Passing an empty/null function to Wait() is not supported.
TEST(CyclicBarrierTest, NullFunction) {
CyclicBarrier barrier(1);
typedef void (*fn_ptr_t)();
IMPALA_ASSERT_DEBUG_DEATH(barrier.Wait(static_cast<fn_ptr_t>(nullptr)), "");
}
} // namespace impala