diff --git a/CHANGES.txt b/CHANGES.txt
index 9dc12c3..e7419bb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@
 
       BOOKKEEPER-799: Distribution schedule coverage sets don't take gaps in response lists into account when writequorum > ackquorum (ivank)
 
+      BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-800: Expose whether a ledger is closed or not (ivank)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index dc0704b..130ce12 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -238,7 +239,7 @@
     /**
      * Has the ledger been closed?
      */
-    public boolean isClosed() {
+    public synchronized boolean isClosed() {
         return metadata.isClosed();
     }
 
@@ -260,13 +261,32 @@
                 final State prevState;
                 List<PendingAddOp> pendingAdds;
 
-                synchronized(LedgerHandle.this) {
-                    // if the metadata is already closed, we don't need to proceed the process
-                    // otherwise, it might end up encountering bad version error log messages when updating metadata
-                    if (metadata.isClosed()) {
-                        cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
-                        return;
+                if (isClosed()) {
+                    // TODO: make ledger metadata immutable
+                    // Although the metadata is already closed, we don't need to proceed zookeeper metadata update, but
+                    // we still need to error out the pending add ops.
+                    //
+                    // There is a race condition a pending add op is enqueued, after a close op reset ledger metadata state
+                    // to unclosed to resolve metadata conflicts. If we don't error out these pending add ops, they would be
+                    // leak and never callback.
+                    //
+                    // The race condition happen in following sequence:
+                    // a) ledger L is fenced
+                    // b) write entry E encountered LedgerFencedException, trigger ledger close procedure
+                    // c) ledger close encountered metadata version exception and set ledger metadata back to open
+                    // d) writer tries to write entry E+1, since ledger metadata is still open (reset by c))
+                    // e) the close procedure in c) resolved the metadata conflicts and set ledger metadata to closed
+                    // f) writing entry E+1 encountered LedgerFencedException which will enter ledger close procedure
+                    // g) it would find that ledger metadata is closed, then it callbacks immediately without erroring out any pendings
+                    synchronized (LedgerHandle.this) {
+                        pendingAdds = drainPendingAddsToErrorOut();
                     }
+                    errorOutPendingAdds(rc, pendingAdds);
+                    cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+                    return;
+                }
+
+                synchronized(LedgerHandle.this) {
                     prevState = metadata.getState();
                     prevLastEntryId = metadata.getLastEntryId();
                     prevLength = metadata.getLength();
@@ -523,9 +543,24 @@
         }
 
         if (wasClosed) {
-            LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-            cb.addComplete(BKException.Code.LedgerClosedException,
-                           LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+            // make sure the callback is triggered in main worker pool
+            try {
+                bk.mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
+                        cb.addComplete(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+                    }
+                    @Override
+                    public String toString() {
+                        return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                cb.addComplete(BKException.Code.InterruptedException,
+                        LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+            }
             return;
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index 85cd271..4b1d7f5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -20,9 +20,13 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -34,10 +38,14 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestCallbacks.AddCallbackFuture;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.*;
+import static com.google.common.base.Charsets.UTF_8;
+
 /**
  * This class tests the ledger close logic.
  */
@@ -223,4 +231,35 @@
         bsConfs.add(conf);
         bs.add(startBookie(conf, dBookie));
     }
+
+    @Test(timeout = 60000)
+    public void testAllWritesAreCompletedOnClosedLedger() throws Exception {
+        for (int i = 0; i < 100; i++) {
+            LOG.info("Iteration {}", i);
+
+            List<AddCallbackFuture> futures = new ArrayList<AddCallbackFuture>();
+            LedgerHandle w = bkc.createLedger(DigestType.CRC32, new byte[0]);
+            AddCallbackFuture f = new AddCallbackFuture(0L);
+            w.asyncAddEntry("foobar".getBytes(UTF_8), f, null);
+            f.get();
+
+            LedgerHandle r = bkc.openLedger(w.getId(), DigestType.CRC32, new byte[0]);
+            for (int j = 0; j < 100; j++) {
+                AddCallbackFuture f1 = new AddCallbackFuture(1L + j);
+                w.asyncAddEntry("foobar".getBytes(), f1, null);
+                futures.add(f1);
+            }
+
+            for (AddCallbackFuture f2: futures) {
+                try {
+                    f2.get(10, TimeUnit.SECONDS);
+                } catch (ExecutionException ee) {
+                    // we don't care about errors
+                } catch (TimeoutException te) {
+                    LOG.error("Error on waiting completing entry {} : ", f2.getExpectedEntryId(), te);
+                    fail("Should succeed on waiting completing entry " + f2.getExpectedEntryId());
+                }
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index a34283e..afc7ba3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -53,7 +53,7 @@
 
         LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
         for (int i = 0; i < 10; i++) {
-            lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(), null);
+            lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(i), null);
         }
         lh.addEntry("foobar".getBytes());
         lh.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
index 61217f0..5d9f99b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
@@ -25,11 +25,16 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Callbacks implemented with SettableFuture, to be used in tests
  */
 public class TestCallbacks {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestCallbacks.class);
+
     public static class GenericCallbackFuture<T>
         extends AbstractFuture<T> implements GenericCallback<T> {
         @Override
@@ -44,8 +49,21 @@
 
     public static class AddCallbackFuture
         extends AbstractFuture<Long> implements AddCallback {
+
+        private final long expectedEntryId;
+
+        public AddCallbackFuture(long entryId) {
+            this.expectedEntryId = entryId;
+        }
+
+        public long getExpectedEntryId() {
+            return expectedEntryId;
+        }
+
         @Override
         public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+            logger.info("Add entry {} completed : entryId = {}, rc = {}",
+                    new Object[] { expectedEntryId, entryId, rc });
             if (rc != BKException.Code.OK) {
                 setException(BKException.create(rc));
             } else {
