add BarrierEvent (#60)

diff --git a/pom.xml b/pom.xml
index 7f77ac0..91fb035 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>com.baidu.hugegraph</groupId>
     <artifactId>hugegraph-common</artifactId>
-    <version>1.8.2</version>
+    <version>1.8.3</version>
 
     <name>hugegraph-common</name>
     <url>https://github.com/hugegraph/hugegraph-common</url>
@@ -266,7 +266,7 @@
                         <manifestEntries>
                             <!-- Must be on one line, otherwise the automatic
                                  upgrade script cannot replace the version number -->
-                            <Implementation-Version>1.8.2.0</Implementation-Version>
+                            <Implementation-Version>1.8.3.0</Implementation-Version>
                         </manifestEntries>
                     </archive>
                 </configuration>
diff --git a/src/main/java/com/baidu/hugegraph/concurrent/BarrierEvent.java b/src/main/java/com/baidu/hugegraph/concurrent/BarrierEvent.java
new file mode 100644
index 0000000..ece4c7a
--- /dev/null
+++ b/src/main/java/com/baidu/hugegraph/concurrent/BarrierEvent.java
@@ -0,0 +1,107 @@
+/*
+ *
+ *  Copyright 2017 HugeGraph Authors
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with this
+ *  work for additional information regarding copyright ownership. The ASF
+ *  licenses this file to You under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ *  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ *  License for the specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package com.baidu.hugegraph.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.baidu.hugegraph.util.E;
+
+public class BarrierEvent {
+
+    private final Lock lock = new ReentrantLock();
+    private final Condition cond = lock.newCondition();
+    private volatile boolean signaled = false;
+
+    /**
+     * Wait forever until the signal is received.
+     * @return true if signal is received
+     * @throws InterruptedException if interrupted.
+     */
+    public void await() throws InterruptedException {
+        this.lock.lock();
+        try {
+            while (!this.signaled) {
+                this.cond.await();
+            }
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    /**
+     * Wait specified time in milliseconds.
+     * @param timeout: the time in millisecond to wait.
+     * @return true if signal is received, false if time out.
+     */
+    public boolean await(long timeout) throws InterruptedException {
+        E.checkArgument(timeout >= 0L,
+                        "The time must be >= 0, but got '%d'.",
+                        timeout);
+        long deadline = System.currentTimeMillis() + timeout;
+        this.lock.lock();
+        try {
+            while (!this.signaled) {
+                timeout = deadline - System.currentTimeMillis();
+                if (timeout > 0) {
+                    this.cond.await(timeout, TimeUnit.MILLISECONDS);
+                }
+                if (System.currentTimeMillis() >= deadline) {
+                    return this.signaled;
+                }
+            }
+        } finally {
+            this.lock.unlock();
+        }
+        return true;
+    }
+
+    public void reset() {
+        this.lock.lock();
+        try {
+            this.signaled = false;
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    public void signal() {
+        this.lock.lock();
+        try {
+            this.signaled = true;
+            this.cond.signal();
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    public void signalAll() {
+        this.lock.lock();
+        try {
+            this.signaled = true;
+            this.cond.signalAll();
+        } finally {
+            this.lock.unlock();
+        }
+    }
+}
diff --git a/src/main/java/com/baidu/hugegraph/version/CommonVersion.java b/src/main/java/com/baidu/hugegraph/version/CommonVersion.java
index ec438d3..4b590a1 100644
--- a/src/main/java/com/baidu/hugegraph/version/CommonVersion.java
+++ b/src/main/java/com/baidu/hugegraph/version/CommonVersion.java
@@ -27,5 +27,5 @@
 
     // The second parameter of Version.of() is for all-in-one JAR
     public static final Version VERSION = Version.of(CommonVersion.class,
-                                                     "1.8.2");
+                                                     "1.8.3");
 }
diff --git a/src/test/java/com/baidu/hugegraph/unit/UnitTestSuite.java b/src/test/java/com/baidu/hugegraph/unit/UnitTestSuite.java
index c6dfa62..211806d 100644
--- a/src/test/java/com/baidu/hugegraph/unit/UnitTestSuite.java
+++ b/src/test/java/com/baidu/hugegraph/unit/UnitTestSuite.java
@@ -32,6 +32,7 @@
 import com.baidu.hugegraph.unit.config.HugeConfigTest;
 import com.baidu.hugegraph.unit.config.OptionSpaceTest;
 import com.baidu.hugegraph.unit.date.SafeDateFormatTest;
+import com.baidu.hugegraph.unit.concurrent.BarrierEventTest;
 import com.baidu.hugegraph.unit.event.EventHubTest;
 import com.baidu.hugegraph.unit.iterator.BatchMapperIteratorTest;
 import com.baidu.hugegraph.unit.iterator.ExtendableIteratorTest;
@@ -73,6 +74,7 @@
     HugeConfigTest.class,
     OptionSpaceTest.class,
     SafeDateFormatTest.class,
+    BarrierEventTest.class,
     EventHubTest.class,
     PerfUtilTest.class,
     RestClientTest.class,
diff --git a/src/test/java/com/baidu/hugegraph/unit/concurrent/BarrierEventTest.java b/src/test/java/com/baidu/hugegraph/unit/concurrent/BarrierEventTest.java
new file mode 100644
index 0000000..c81aa3d
--- /dev/null
+++ b/src/test/java/com/baidu/hugegraph/unit/concurrent/BarrierEventTest.java
@@ -0,0 +1,271 @@
+package com.baidu.hugegraph.unit.concurrent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import com.baidu.hugegraph.concurrent.BarrierEvent;
+import com.baidu.hugegraph.testutil.Assert;
+
+public class BarrierEventTest {
+    
+    private static int WAIT_THREADS_COUNT = 10;
+
+    @Test(timeout = 5000)
+    public void testAWait() throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger result = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(2);
+        Thread awaitThread = new Thread(() -> {
+            try {
+                barrierEvent.await();
+                result.incrementAndGet();
+            } catch (InterruptedException e) {
+                // Do nothing.
+            } finally {
+                latch.countDown();
+            }
+        });
+        awaitThread.start();
+        Thread signalThread = new Thread(() -> {
+            barrierEvent.signalAll();
+            latch.countDown();
+        });
+        signalThread.start();
+        latch.await();
+        Assert.assertEquals(1, result.get());
+    }
+
+    @Test
+    public void testAWaitWithTimeout() throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        boolean signaled = barrierEvent.await(1L);
+        Assert.assertFalse(signaled);
+    }
+
+    @Test
+    public void testReset() throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        boolean signaled = barrierEvent.await(1L);
+        Assert.assertFalse(signaled);
+        barrierEvent.signal();
+        signaled = barrierEvent.await(1L);
+        Assert.assertTrue(signaled);
+        barrierEvent.reset();
+        signaled = barrierEvent.await(1L);
+        Assert.assertFalse(signaled);
+    }
+
+    @Test
+    public void testSignal() throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        boolean signaled = barrierEvent.await(1L);
+        Assert.assertFalse(signaled);
+        barrierEvent.signal();
+        signaled = barrierEvent.await(1L);
+        Assert.assertTrue(signaled);
+    }
+
+    @Test
+    public void testSignalByMultiThreadWithSignalFirst()
+                throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger eventCount = new AtomicInteger(0);
+        AtomicInteger waitThreadInterruptedCount = new AtomicInteger(0);
+        ExecutorService executorService =
+                        Executors.newFixedThreadPool(WAIT_THREADS_COUNT + 1);
+        CountDownLatch waitLatch = new CountDownLatch(WAIT_THREADS_COUNT);
+        CountDownLatch signalLatch = new CountDownLatch(1);
+        for (int i = 0; i < WAIT_THREADS_COUNT; i++) {
+            executorService.submit(() -> {
+                try {
+                    signalLatch.await();
+                    barrierEvent.await();
+                    eventCount.incrementAndGet();
+                } catch (InterruptedException e) {
+                    waitThreadInterruptedCount.incrementAndGet();
+                } finally {
+                    waitLatch.countDown();
+                }
+            });
+        }
+
+        executorService.submit(() -> {
+            barrierEvent.signal();
+            signalLatch.countDown();
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(2L, TimeUnit.SECONDS);
+        waitLatch.await();
+        Assert.assertEquals(10, eventCount.get());
+        Assert.assertEquals(0, waitThreadInterruptedCount.get());
+    }
+
+    @Test
+    public void testSignalByMultiThreadWithSignalLast()
+                throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger eventCount = new AtomicInteger(0);
+        AtomicInteger waitThreadInterruptedCount = new AtomicInteger(0);
+        AtomicInteger signalThreadInterruptedCount = new AtomicInteger(0);
+        ExecutorService executorService =
+                        Executors.newFixedThreadPool(WAIT_THREADS_COUNT + 1);
+        CountDownLatch waitLatch = new CountDownLatch(WAIT_THREADS_COUNT);
+        CountDownLatch signalLatch = new CountDownLatch(1);
+        for (int i = 0; i < WAIT_THREADS_COUNT; i++) {
+            executorService.submit(() -> {
+                try {
+                    waitLatch.countDown();
+                    barrierEvent.await();
+                    eventCount.incrementAndGet();
+                } catch (InterruptedException e) {
+                    waitThreadInterruptedCount.incrementAndGet();
+                }
+            });
+        }
+
+        executorService.submit(() -> {
+            try {
+                waitLatch.await();
+            } catch (InterruptedException e) {
+                signalThreadInterruptedCount.incrementAndGet();
+            }
+            barrierEvent.signal();
+            signalLatch.countDown();
+        });
+        signalLatch.await();
+        executorService.shutdownNow();
+        executorService.awaitTermination(1L, TimeUnit.SECONDS);
+        Assert.assertEquals(1, eventCount.get());
+        Assert.assertEquals(WAIT_THREADS_COUNT - 1,
+                            waitThreadInterruptedCount.get());
+        Assert.assertEquals(0, signalThreadInterruptedCount.get());
+    }
+
+    @Test
+    public void testSignalAll() throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        boolean signaled = barrierEvent.await(1L);
+        Assert.assertFalse(signaled);
+        barrierEvent.signalAll();
+        signaled = barrierEvent.await(1L);
+        Assert.assertTrue(signaled);
+    }
+
+    @Test
+    public void testSignalAllByMultiThreadWithSignalFirst()
+                throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger eventCount = new AtomicInteger(0);
+        AtomicInteger waitThreadInterruptedCount = new AtomicInteger(0);
+        ExecutorService executorService =
+                        Executors.newFixedThreadPool(WAIT_THREADS_COUNT + 1);
+        CountDownLatch waitLatch = new CountDownLatch(WAIT_THREADS_COUNT);
+        CountDownLatch signalLatch = new CountDownLatch(1);
+        for (int i = 0; i < WAIT_THREADS_COUNT; i++) {
+            executorService.submit(() -> {
+                try {
+                    signalLatch.await();
+                    waitLatch.countDown();
+                    barrierEvent.await();
+                    eventCount.incrementAndGet();
+                } catch (InterruptedException e) {
+                    waitThreadInterruptedCount.incrementAndGet();
+                }
+            });
+        }
+
+        executorService.submit(() -> {
+            barrierEvent.signalAll();
+            signalLatch.countDown();
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(1L, TimeUnit.SECONDS);
+        Assert.assertEquals(10, eventCount.get());
+        Assert.assertEquals(0, waitThreadInterruptedCount.get());
+    }
+
+    @Test
+    public void testSignalAllByMultiThreadWithSignalLast()
+                throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger eventCount = new AtomicInteger(0);
+        AtomicInteger waitThreadInterruptedCount = new AtomicInteger(0);
+        AtomicInteger signalThreadInterruptedCount = new AtomicInteger(0);
+        ExecutorService executorService =
+                        Executors.newFixedThreadPool(WAIT_THREADS_COUNT + 1);
+        CountDownLatch waitLatch = new CountDownLatch(WAIT_THREADS_COUNT);
+        CountDownLatch signalLatch = new CountDownLatch(1);
+        for (int i = 0; i < WAIT_THREADS_COUNT; i++) {
+            executorService.submit(() -> {
+                try {
+                    waitLatch.countDown();
+                    barrierEvent.await();
+                    eventCount.incrementAndGet();
+                } catch (InterruptedException e) {
+                    waitThreadInterruptedCount.incrementAndGet();
+                }
+            });
+        }
+
+        executorService.submit(() -> {
+            try {
+                waitLatch.await();
+            } catch (InterruptedException e) {
+                signalThreadInterruptedCount.incrementAndGet();
+            }
+            barrierEvent.signalAll();
+            signalLatch.countDown();
+        });
+        signalLatch.await();
+        executorService.shutdown();
+        executorService.awaitTermination(1L, TimeUnit.SECONDS);
+        Assert.assertEquals(WAIT_THREADS_COUNT, eventCount.get());
+        Assert.assertEquals(0, waitThreadInterruptedCount.get());
+        Assert.assertEquals(0, signalThreadInterruptedCount.get());
+    }
+
+    @Test
+    public void testSignalAllByMultiThreadWithSignalAwaitConcurrent()
+                throws InterruptedException {
+        BarrierEvent barrierEvent = new BarrierEvent();
+        AtomicInteger eventCount = new AtomicInteger(0);
+        AtomicInteger waitThreadInterruptedCount = new AtomicInteger(0);
+        AtomicInteger signalThreadInterruptedCount = new AtomicInteger(0);
+        ExecutorService executorService =
+                        Executors.newFixedThreadPool(WAIT_THREADS_COUNT + 1);
+        CountDownLatch syncLatch = new CountDownLatch(1);
+        for (int i = 0; i < WAIT_THREADS_COUNT; i++) {
+            executorService.submit(() -> {
+                try {
+                    syncLatch.await();
+                    barrierEvent.await();
+                    eventCount.incrementAndGet();
+                } catch (InterruptedException e) {
+                    waitThreadInterruptedCount.incrementAndGet();
+                }
+            });
+        }
+
+        executorService.submit(() -> {
+            try {
+                syncLatch.await();
+            } catch (InterruptedException e) {
+                signalThreadInterruptedCount.incrementAndGet();
+            }
+            barrierEvent.signalAll();
+        });
+        syncLatch.countDown();
+        executorService.shutdown();
+        executorService.awaitTermination(1L, TimeUnit.SECONDS);
+        Assert.assertEquals(WAIT_THREADS_COUNT, eventCount.get());
+        Assert.assertEquals(0, waitThreadInterruptedCount.get());
+        Assert.assertEquals(0, signalThreadInterruptedCount.get());
+    }
+}