// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <string>
#include <boost/thread.hpp>

#include "common/atomic.h"
#include "testutil/gtest-util.h"

#include "common/names.h"

namespace impala {

using namespace internal; // Testing AtomicInt<> directly.

// Simple test to make sure there is no obvious error in the operations.  This is not
// intended to test the thread safety.
template<typename T>
static void TestBasic() {
  AtomicInt<T> i1;
  EXPECT_EQ(i1.Load(), 0);
  i1.Store(10);
  EXPECT_EQ(i1.Load(), 10);
  i1.Add(5);
  EXPECT_EQ(i1.Load(), 15);
  i1.Add(-25);
  EXPECT_EQ(i1.Load(), -10);
  i1.Store(100);
  EXPECT_EQ(i1.Load(), 100);

  bool success = i1.CompareAndSwap(100, 50);
  EXPECT_EQ(i1.Load(), 50);
  EXPECT_EQ(success, true);
  success = i1.CompareAndSwap(50, 100);
  EXPECT_EQ(i1.Load(), 100);
  EXPECT_EQ(success, true);

  success = i1.CompareAndSwap(-200, 50);
  EXPECT_EQ(i1.Load(), 100);
  EXPECT_EQ(success, false);
  success = i1.CompareAndSwap(50, 200);
  EXPECT_EQ(i1.Load(), 100);
  EXPECT_EQ(success, false);
}

TEST(AtomicTest, Basic) {
  TestBasic<int32_t>();
}

TEST(AtomicTest, Basic64) {
  TestBasic<int64_t>();
}

// Basic multi-threaded testing

template<typename T>
static void IncrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
  for (int64_t i = 0; i < n * id; ++i) {
    ai->Add(1);
  }
}

template<typename T>
static void DecrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
  for (int64_t i = 0; i < n * id; ++i) {
    ai->Add(-1);
  }
}

template<typename T>
static void TestMultipleThreadsIncDec() {
  thread_group increments, decrements;
  vector<int> ops;
  ops.push_back(1000);
  ops.push_back(10000);
  vector<int> num_threads;
  num_threads.push_back(4);
  num_threads.push_back(8);
  num_threads.push_back(16);

  for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
       ++thrit) {
    for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
      AtomicInt<T> ai(0);
      for (int i = 0; i < *thrit; ++i) {
        increments.add_thread(new thread(IncrementThread<T>, i, *opit, &ai));
        decrements.add_thread(new thread(DecrementThread<T>, i, *opit, &ai));
      }
      increments.join_all();
      decrements.join_all();
      EXPECT_EQ(ai.Load(), 0);
    }
  }
}

TEST(AtomicTest, MultipleThreadsIncDec) {
  TestMultipleThreadsIncDec<int32_t>();
}

TEST(AtomicTest, MultipleThreadsIncDec64) {
  TestMultipleThreadsIncDec<int64_t>();
}

template<typename T>
static void CASIncrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
  T oldval = 0;
  T newval = 0;
  bool success = false;
  for (int64_t i = 0; i < n * id; ++i) {
    success = false;
    while (!success) {
      oldval = ai->Load();
      newval = oldval + 1;
      success = ai->CompareAndSwap(oldval, newval);
    }
  }
}

template<typename T>
static void CASDecrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
  T oldval = 0;
  T newval = 0;
  bool success = false;
  for (int64_t i = 0; i < n * id; ++i) {
    success = false;
    while (!success) {
      oldval = ai->Load();
      newval = oldval - 1;
      success = ai->CompareAndSwap(oldval, newval);
    }
  }
}

template<typename T>
static void TestMultipleThreadsCASIncDec() {
  thread_group increments, decrements;
  vector<int> ops;
  ops.push_back(10);
  ops.push_back(10000);
  vector<int> num_threads;
  num_threads.push_back(4);
  num_threads.push_back(8);
  num_threads.push_back(16);

  for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
       ++thrit) {
    for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
      AtomicInt<T> ai(0);
      for (int i = 0; i < *thrit; ++i) {
        increments.add_thread(new thread(CASIncrementThread<T>, i, *opit, &ai));
        decrements.add_thread(new thread(CASDecrementThread<T>, i, *opit, &ai));
      }
      increments.join_all();
      decrements.join_all();
      EXPECT_EQ(ai.Load(), 0);
    }
  }
}

TEST(AtomicTest, MultipleThreadsCASIncDec) {
  TestMultipleThreadsCASIncDec<int32_t>();
}

TEST(AtomicTest, MultipleThreadsCASIncDec64) {
  TestMultipleThreadsCASIncDec<int64_t>();
}

template<typename T>
static void AcquireReleaseThreadA(T id, T other_id, int iters, AtomicInt<T>* control,
    T* payload) {
  const int DELAY = 100;
  for (int it = 0; it < iters; ++it) {
    // Phase 1
    // (A-1) This store is not allowed to reorder with the below "store release".
    *payload = it;
    control->Store(other_id);

    // Phase 2
    while (control->Load() != id) ;
    // (A-2) This store is not allowed to reorder with the above "load acquire".
    *payload = -it;

    // Give thread B a chance to get started on Phase 1.
    for (int i = 0; i < DELAY; ++i) {
      AtomicUtil::CompilerBarrier();
    }
  }
}

template<typename T>
static void AcquireReleaseThreadB(T id, T other_id, int iters, AtomicInt<T>* control,
    T* payload) {
  const int DELAY = 100;
  for (int it = 0; it < iters; ++it) {
    T p;
    // Phase 1
    // Wait until ThreadA signaled that payload was updated.
    while (control->Load() != id);
    // (B-1) This load is not allowed to reorder with the above "load acquire".
    p = *payload;
    // Verify ordering for both (A-1) and (B-1).
    EXPECT_EQ(it, p);

    // Phase 2
    // Payload should not change until we give ThreadA the go-ahead.
    for (int i = 0; i < DELAY; ++i) {
      AtomicUtil::CompilerBarrier();
      // (B-2) This load is not allowed to reorder with the below "store release".
      p = *payload;
    }
    control->Store(other_id);
    // Verify ordering for both (A-2) and (B-2).
    EXPECT_EQ(it, p);
  }
}

// Test "acquire" and "release" memory ordering semantics. There are two threads, A and
// B, and each execute phase 1 and phase 2 in lockstep to verify the various
// combinations of memory accesses.
template<typename T>
static void TestAcquireReleaseLoadStore() {
  const int ITERS = 1000000;
  AtomicInt<T> control(-1);
  T payload = -1;
  thread t_a(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
  thread t_b(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
  t_a.join();
  t_b.join();
}

TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt) {
  TestAcquireReleaseLoadStore<int32_t>();
}

TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt64) {
  TestAcquireReleaseLoadStore<int64_t>();
}

}
