Entries must be acknowledged by bookies in multiple fault domains before being acknowledged to client

Descriptions of the changes in this PR:

Bookkeeper write logic makes sure that there are at least ackQuorumSize
number of successful writes before sending ack back to the client. In
many cases these may fall into the same fault domain. A mechanism to
force bookkeeper to make sure that there are acks from at least
minNumRacksPerWriteQuorum number of fault domains and a configuration
to enforce this.

Signed-off-by: Ankit Jain <jain.asalesforce.com>

Master Issue: #2095 

Reviewers: Charan Reddy Guttapalem <reddycharan18@gmail.com>, Enrico Olivelli <eolivelli@gmail.com>

This closes #2096 from ankit-j/ankit-j/enforceFragmentMultipleFaultDomainWrite and squashes the following commits:

c90fd5a3d [Ankit Jain] Addressing review comments
07deae673 [Ankit Jain] Addressing @reddycharan's review comments
14164e291 [Ankit Jain] Fixed spacing error in bk_server.yaml
22c8b3c03 [Ankit Jain] Updated testing.
917ed1c45 [Ankit Jain] Move readLock.unlock to finally block
78e0cd501 [Ankit Jain] Modify test to not use default rack for bookies
ca0bc3b8b [Ankit Jain] Entries must be acknowledged by bookies in multiple fault domains before being acknowledged to client
d35aa22ad [Charan Reddy Guttapalem] Move common placementpolicy components to TopologyAwareEnsemblePlacementPolicy.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 36c304c..ed10b9c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -97,6 +97,10 @@
 
     // placementpolicy stats
     String NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK = "NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK";
+    String WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS = "WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS";
+    String WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS_LATENCY =
+            "WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS_LATENCY";
+    String WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS = "WRITE_TIME_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS";
 
     OpStatsLogger getCreateOpLogger();
     OpStatsLogger getOpenOpLogger();
@@ -119,6 +123,9 @@
     Counter getLacUpdateHitsCounter();
     Counter getLacUpdateMissesCounter();
     OpStatsLogger getClientChannelWriteWaitLogger();
+    OpStatsLogger getWriteDelayedDueToNotEnoughFaultDomainsLatency();
+    Counter getWriteDelayedDueToNotEnoughFaultDomains();
+    Counter getWriteTimedOutDueToNotEnoughFaultDomains();
     void registerPendingAddsGauge(Gauge<Integer> gauge);
 
     static BookKeeperClientStats newInstance(StatsLogger stats) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index da79108..85d42c6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -49,6 +49,7 @@
     final long timeoutMonitorIntervalSec;
     final boolean enableBookieFailureTracking;
     final boolean useV2WireProtocol;
+    final boolean enforceMinNumFaultDomainsForWrite;
 
     static ClientInternalConf defaultValues() {
         return fromConfig(new ClientConfiguration());
@@ -82,6 +83,7 @@
         this.enableBookieFailureTracking = conf.getEnableBookieFailureTracking();
         this.useV2WireProtocol = conf.getUseV2WireProtocol();
         this.enableStickyReads = conf.isStickyReadsEnabled();
+        this.enforceMinNumFaultDomainsForWrite = conf.getEnforceMinNumFaultDomainsForWrite();
 
         if (conf.getFirstSpeculativeReadTimeout() > 0) {
             this.readSpeculativeRequestPolicy =
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 7dc8111..64a4b91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -415,6 +415,25 @@
     }
 
     /**
+     * Returns true if the bookies that have acknowledged a write adhere to the minimum fault domains as defined in the
+     * placement policy in use. Ex: In the case of RackawareEnsemblePlacementPolicy, bookies belong to at least
+     * 'minNumRacksPerWriteQuorum' number of racks.
+     *
+     * @param ackedBookies
+     *            list of BookieSocketAddress of bookies that have acknowledged a write.
+     * @param writeQuorumSize
+     *            writeQuorumSize of the ensemble
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ensemble
+     * @return
+     */
+    default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieSocketAddress> ackedBookies,
+                                                             int writeQuorumSize,
+                                                             int ackQuorumSize) {
+        return true;
+    }
+
+    /**
      * Result of a placement calculation against a placement policy.
      */
     final class PlacementResult<T> {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index ad2f7ae..ff80615 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -30,8 +30,10 @@
 import io.netty.util.ReferenceCountUtil;
 import java.util.EnumSet;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
@@ -71,6 +73,8 @@
     boolean isRecoveryAdd = false;
     long requestTimeNanos;
     long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
+    Set<BookieSocketAddress> addEntrySuccessBookies;
+    long writeDelayedStartTime; // min fault domains completion latency after response from ack quorum bookies
 
     long currentLedgerLength;
     int pendingWriteRequests;
@@ -106,6 +110,13 @@
         op.qwcLatency = 0;
         op.writeFlags = writeFlags;
 
+        if (op.addEntrySuccessBookies == null) {
+            op.addEntrySuccessBookies = new HashSet<>();
+        } else {
+            op.addEntrySuccessBookies.clear();
+        }
+        op.writeDelayedStartTime = -1;
+
         return op;
     }
 
@@ -159,6 +170,11 @@
                 public void safeRun() {
                     if (completed) {
                         return;
+                    } else if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) {
+                        // If ackQuorum number of bookies have acknowledged the write but still not complete, indicates
+                        // failures due to not having been written to enough fault domains. Increment corresponding
+                        // counter.
+                        clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
                     }
                     lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
                 }
@@ -282,6 +298,7 @@
         boolean ackQuorum = false;
         if (BKException.Code.OK == rc) {
             ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
+            addEntrySuccessBookies.add(ensemble.get(bookieIndex));
         }
 
         if (completed) {
@@ -363,10 +380,33 @@
         }
 
         if (ackQuorum && !completed) {
-            completed = true;
-            this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
+            if (clientCtx.getConf().enforceMinNumFaultDomainsForWrite
+                && !(clientCtx.getPlacementPolicy()
+                              .areAckedBookiesAdheringToPlacementPolicy(addEntrySuccessBookies,
+                                                                        lh.getLedgerMetadata().getWriteQuorumSize(),
+                                                                        lh.getLedgerMetadata().getAckQuorumSize()))) {
+                LOG.warn("Write success for entry ID {} delayed, not acknowledged by bookies in enough fault domains",
+                         entryId);
+                // Increment to indicate write did not complete due to not enough fault domains
+                clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();
 
-            sendAddSuccessCallbacks();
+                // Only do this for the first time.
+                if (writeDelayedStartTime == -1) {
+                    writeDelayedStartTime = MathUtils.nowInNano();
+                }
+            } else {
+                completed = true;
+                this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
+
+                if (writeDelayedStartTime != -1) {
+                    clientCtx.getClientStats()
+                             .getWriteDelayedDueToNotEnoughFaultDomainsLatency()
+                             .registerSuccessfulEvent(MathUtils.elapsedNanos(writeDelayedStartTime),
+                                                      TimeUnit.NANOSECONDS);
+                }
+
+                sendAddSuccessCallbacks();
+            }
         }
     }
 
@@ -478,6 +518,8 @@
         hasRun = false;
         allowFailFast = false;
         writeFlags = null;
+        addEntrySuccessBookies.clear();
+        writeDelayedStartTime = -1;
 
         recyclerHandle.recycle(this);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 2b1090f..86fc391 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -45,6 +45,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
@@ -1060,4 +1061,32 @@
         }
         return true;
     }
+
+    @Override
+    public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieSocketAddress> ackedBookies,
+                                                            int writeQuorumSize,
+                                                            int ackQuorumSize) {
+        HashSet<String> rackCounter = new HashSet<>();
+        int minWriteQuorumNumRacksPerWriteQuorum = Math.min(writeQuorumSize, minNumRacksPerWriteQuorum);
+
+        ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
+        readLock.lock();
+        try {
+            for (BookieSocketAddress bookie : ackedBookies) {
+                rackCounter.add(knownBookies.get(bookie).getNetworkLocation());
+            }
+
+            // Check to make sure that ensemble is writing to `minNumberOfRacks`'s number of racks at least.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("areAckedBookiesAdheringToPlacementPolicy returning {} because number of racks = {} and "
+                          + "minNumRacksPerWriteQuorum = {}",
+                          rackCounter.size() >= minNumRacksPerWriteQuorum,
+                          rackCounter.size(),
+                          minNumRacksPerWriteQuorum);
+            }
+        } finally {
+            readLock.unlock();
+        }
+        return rackCounter.size() >= minWriteQuorumNumRacksPerWriteQuorum;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java
index a29621a..811a4b8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperClientStatsImpl.java
@@ -141,6 +141,24 @@
     )
     private final Counter speculativeReadCounter;
 
+    @StatsDoc(
+        name = WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS_LATENCY,
+        help = "The delay in write completion because min number of fault domains was not reached"
+    )
+    private final OpStatsLogger writeDelayedDueToNotEnoughFaultDomainsLatency;
+
+    @StatsDoc(
+        name = WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS,
+        help = "The number of times write completion was delayed because min number of fault domains was not reached"
+    )
+    private final Counter writeDelayedDueToNotEnoughFaultDomains;
+
+    @StatsDoc(
+        name = WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS,
+        help = "The number of times write completion timed out because min number of fault domains was not reached"
+    )
+    private final Counter writeTimedOutDueToNotEnoughFaultDomains;
+
 
     public BookKeeperClientStatsImpl(StatsLogger stats) {
         this.stats = stats;
@@ -166,6 +184,12 @@
         this.clientChannelWriteWaitStats = stats.getOpStatsLogger(CLIENT_CHANNEL_WRITE_WAIT);
 
         speculativeReadCounter = stats.getCounter(SPECULATIVE_READ_COUNT);
+
+        this.writeDelayedDueToNotEnoughFaultDomainsLatency =
+                stats.getOpStatsLogger(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS_LATENCY);
+        this.writeDelayedDueToNotEnoughFaultDomains = stats.getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS);
+        this.writeTimedOutDueToNotEnoughFaultDomains =
+                stats.getCounter(WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS);
     }
 
     @Override
@@ -253,6 +277,18 @@
         return stats.getCounter(LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION + "-" + bookie);
     }
     @Override
+    public OpStatsLogger getWriteDelayedDueToNotEnoughFaultDomainsLatency() {
+        return writeDelayedDueToNotEnoughFaultDomainsLatency;
+    }
+    @Override
+    public Counter getWriteDelayedDueToNotEnoughFaultDomains() {
+        return writeDelayedDueToNotEnoughFaultDomains;
+    }
+    @Override
+    public Counter getWriteTimedOutDueToNotEnoughFaultDomains() {
+        return writeTimedOutDueToNotEnoughFaultDomains;
+    }
+    @Override
     public void registerPendingAddsGauge(Gauge<Integer> gauge) {
         stats.registerGauge(PENDING_ADDS, gauge);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 87c1f41..b666dbc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -159,6 +159,9 @@
     // enforce minimum number of racks per write quorum
     public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
 
+    // enforce minimum number of fault domains for write
+    public static final String ENFORCE_MIN_NUM_FAULT_DOMAINS_FOR_WRITE = "enforceMinNumFaultDomainsForWrite";
+
     // ignore usage of local node in the internal logic of placement policy
     public static final String IGNORE_LOCAL_NODE_IN_PLACEMENT_POLICY = "ignoreLocalNodeInPlacementPolicy";
 
@@ -847,6 +850,20 @@
     }
 
     /**
+     * Set the flag to enforce minimum number of fault domains for write.
+     */
+    public void setEnforceMinNumFaultDomainsForWrite(boolean enforceMinNumFaultDomainsForWrite) {
+        setProperty(ENFORCE_MIN_NUM_FAULT_DOMAINS_FOR_WRITE, enforceMinNumFaultDomainsForWrite);
+    }
+
+    /**
+     * Get the flag to enforce minimum number of fault domains for write.
+     */
+    public boolean getEnforceMinNumFaultDomainsForWrite() {
+        return getBoolean(ENFORCE_MIN_NUM_FAULT_DOMAINS_FOR_WRITE, false);
+    }
+
+    /**
      * Sets the flag to ignore usage of localnode in placement policy.
      */
     public void setIgnoreLocalNodeInPlacementPolicy(boolean ignoreLocalNodeInPlacementPolicy) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 80d059d..3c842e6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.BookKeeperClientStats.WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS;
+import static org.apache.bookkeeper.client.BookKeeperClientStats.WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -31,9 +33,11 @@
 import io.netty.util.IllegalReferenceCountException;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,8 +52,12 @@
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
@@ -956,4 +964,152 @@
         bk.deleteLedger(ledgerId);
         bk.close();
     }
+
+    /**
+     * Mock of RackawareEnsemblePlacementPolicy. Overrides areAckedBookiesAdheringToPlacementPolicy to only return true
+     * when ackedBookies consists of writeQuorumSizeToUseForTesting bookies.
+     */
+    public static class MockRackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy {
+        private int writeQuorumSizeToUseForTesting;
+        private CountDownLatch conditionFirstInvocationLatch;
+
+        void setWriteQuorumSizeToUseForTesting(int writeQuorumSizeToUseForTesting) {
+            this.writeQuorumSizeToUseForTesting = writeQuorumSizeToUseForTesting;
+        }
+
+        void setConditionFirstInvocationLatch(CountDownLatch conditionFirstInvocationLatch) {
+            this.conditionFirstInvocationLatch = conditionFirstInvocationLatch;
+        }
+
+        @Override
+        public boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieSocketAddress> ackedBookies,
+                                                                int writeQuorumSize,
+                                                                int ackQuorumSize) {
+            conditionFirstInvocationLatch.countDown();
+            return ackedBookies.size() == writeQuorumSizeToUseForTesting;
+        }
+    }
+
+    /**
+     * Test to verify that PendingAddOp waits for success condition from areAckedBookiesAdheringToPlacementPolicy
+     * before returning success to client. Also tests working of WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS and
+     * WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS counters.
+     */
+    @Test
+    public void testEnforceMinNumFaultDomainsForWrite() throws Exception {
+        byte[] data = "foobar".getBytes();
+        byte[] password = "testPasswd".getBytes();
+
+        startNewBookie();
+        startNewBookie();
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        conf.setEnsemblePlacementPolicy(MockRackawareEnsemblePlacementPolicy.class);
+
+        conf.setAddEntryTimeout(2);
+        conf.setAddEntryQuorumTimeout(4);
+        conf.setEnforceMinNumFaultDomainsForWrite(true);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+
+        // Abnormal values for testing to prevent timeouts
+        BookKeeperTestClient bk = new BookKeeperTestClient(conf, statsProvider);
+        StatsLogger statsLogger = bk.getStatsLogger();
+
+        int ensembleSize = 3;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        MockRackawareEnsemblePlacementPolicy currPlacementPolicy =
+                (MockRackawareEnsemblePlacementPolicy) bk.getPlacementPolicy();
+        currPlacementPolicy.setConditionFirstInvocationLatch(countDownLatch);
+        currPlacementPolicy.setWriteQuorumSizeToUseForTesting(writeQuorumSize);
+
+        BookieSocketAddress bookieToSleep;
+
+        try (LedgerHandle lh = bk.createLedger(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password)) {
+            CountDownLatch sleepLatchCase1 = new CountDownLatch(1);
+            CountDownLatch sleepLatchCase2 = new CountDownLatch(1);
+
+            // Put all non ensemble bookies to sleep
+            LOG.info("Putting all non ensemble bookies to sleep.");
+            for (BookieServer bookieServer : bs) {
+                try {
+                    if (!lh.getCurrentEnsemble().contains(bookieServer.getLocalAddress())) {
+                        sleepBookie(bookieServer.getLocalAddress(), sleepLatchCase2);
+                    }
+                } catch (UnknownHostException ignored) {}
+            }
+
+            Thread writeToLedger = new Thread(() -> {
+                try {
+                    LOG.info("Initiating write for entry");
+                    long entryId = lh.addEntry(data);
+                    LOG.info("Wrote entry with entryId = {}", entryId);
+                } catch (InterruptedException | BKException ignored) {
+                }
+            });
+
+            bookieToSleep = lh.getCurrentEnsemble().get(0);
+
+            LOG.info("Putting picked bookie to sleep");
+            sleepBookie(bookieToSleep, sleepLatchCase1);
+
+            assertEquals(statsLogger
+                           .getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS)
+                           .get()
+                           .longValue(), 0);
+
+            // Trying to write entry
+            writeToLedger.start();
+
+            // Waiting and checking to make sure that write has not succeeded
+            countDownLatch.await(conf.getAddEntryTimeout(), TimeUnit.SECONDS);
+            assertEquals("Write succeeded but should not have", -1, lh.lastAddConfirmed);
+
+            // Wake the bookie
+            sleepLatchCase1.countDown();
+
+            // Waiting and checking to make sure that write has succeeded
+            writeToLedger.join(conf.getAddEntryTimeout() * 1000);
+            assertEquals("Write did not succeed but should have", 0, lh.lastAddConfirmed);
+
+            assertEquals(statsLogger
+                           .getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS)
+                           .get()
+                           .longValue(), 1);
+
+            // AddEntry thread for second scenario
+            Thread writeToLedger2 = new Thread(() -> {
+                try {
+                    LOG.info("Initiating write for entry");
+                    long entryId = lh.addEntry(data);
+                    LOG.info("Wrote entry with entryId = {}", entryId);
+                } catch (InterruptedException | BKException ignored) {
+                }
+            });
+
+            bookieToSleep = lh.getCurrentEnsemble().get(1);
+
+            LOG.info("Putting picked bookie to sleep");
+            sleepBookie(bookieToSleep, sleepLatchCase2);
+
+            // Trying to write entry
+            writeToLedger2.start();
+
+            // Waiting and checking to make sure that write has failed
+            writeToLedger2.join((conf.getAddEntryQuorumTimeout() + 2) * 1000);
+            assertEquals("Write succeeded but should not have", 0, lh.lastAddConfirmed);
+
+            sleepLatchCase2.countDown();
+
+            assertEquals(statsLogger.getCounter(WRITE_DELAYED_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS).get().longValue(),
+                         2);
+
+            assertEquals(statsLogger.getCounter(WRITE_TIMED_OUT_DUE_TO_NOT_ENOUGH_FAULT_DOMAINS).get().longValue(),
+                         1);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index 4a93d4f..3fc7483 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -2233,4 +2233,95 @@
                     + bookiesOfDefaultRackInEnsemble, bookiesOfDefaultRackInEnsemble.isEmpty());
         }
     }
+
+    private void testAreAckedBookiesAdheringToPlacementPolicyHelper(int minNumRacksPerWriteQuorumConfValue,
+                                                                    int ensembleSize,
+                                                                    int writeQuorumSize,
+                                                                    int ackQuorumSize,
+                                                                    int numOfBookiesInDefaultRack,
+                                                                    int numOfRacks,
+                                                                    int numOfBookiesPerRack) throws Exception {
+        String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
+        repp.uninitalize();
+        updateMyRack(defaultRackForThisTest);
+
+        ClientConfiguration conf = new ClientConfiguration(this.conf);
+        conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
+
+        repp = new RackawareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL, statsLogger);
+        repp.withDefaultRack(defaultRackForThisTest);
+
+        List<BookieSocketAddress> bookieSocketAddressesDefaultRack = new ArrayList<>();
+        List<BookieSocketAddress> bookieSocketAddressesNonDefaultRack = new ArrayList<>();
+        Set<BookieSocketAddress> writableBookies;
+        Set<BookieSocketAddress> bookiesForEntry = new HashSet<>();
+
+        for (int i = 0; i < numOfRacks; i++) {
+            for (int j = 0; j < numOfBookiesPerRack; j++) {
+                int index = i * numOfBookiesPerRack + j;
+                bookieSocketAddressesNonDefaultRack.add(new BookieSocketAddress("128.0.0." + index, 3181));
+                StaticDNSResolver.addNodeToRack(bookieSocketAddressesNonDefaultRack.get(index).getHostName(),
+                                                "/default-region/r" + i);
+            }
+        }
+
+        for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
+            bookieSocketAddressesDefaultRack.add(new BookieSocketAddress("127.0.0." + (i + 100), 3181));
+            StaticDNSResolver.addNodeToRack(bookieSocketAddressesDefaultRack.get(i).getHostName(),
+                                            defaultRackForThisTest);
+        }
+
+        writableBookies = new HashSet<>(bookieSocketAddressesNonDefaultRack);
+        writableBookies.addAll(bookieSocketAddressesDefaultRack);
+        repp.onClusterChanged(writableBookies, new HashSet<>());
+
+        // Case 1 : Bookies in the ensemble from the same rack.
+        // Manually crafting the ensemble here to create the error case when the check should return false
+
+        List<BookieSocketAddress> ensemble = new ArrayList<>(bookieSocketAddressesDefaultRack);
+        for (int entryId = 0; entryId < 10; entryId++) {
+            DistributionSchedule ds = new RoundRobinDistributionSchedule(writeQuorumSize, ackQuorumSize, ensembleSize);
+            DistributionSchedule.WriteSet ws = ds.getWriteSet(entryId);
+
+            for (int i = 0; i < ws.size(); i++) {
+                bookiesForEntry.add(ensemble.get(ws.get(i)));
+            }
+
+            assertFalse(repp.areAckedBookiesAdheringToPlacementPolicy(bookiesForEntry, writeQuorumSize, ackQuorumSize));
+        }
+
+        // Case 2 : Bookies in the ensemble from the different racks
+
+        EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
+                ensembleResponse = repp.newEnsemble(ensembleSize,
+                                                    writeQuorumSize,
+                                                    ackQuorumSize,
+                                                    null,
+                                                    new HashSet<>());
+        ensemble = ensembleResponse.getResult();
+        for (int entryId = 0; entryId < 10; entryId++) {
+            DistributionSchedule ds = new RoundRobinDistributionSchedule(writeQuorumSize, ackQuorumSize, ensembleSize);
+            DistributionSchedule.WriteSet ws = ds.getWriteSet(entryId);
+
+            for (int i = 0; i < ws.size(); i++) {
+                bookiesForEntry.add(ensemble.get(ws.get(i)));
+            }
+
+            assertTrue(repp.areAckedBookiesAdheringToPlacementPolicy(bookiesForEntry, writeQuorumSize, ackQuorumSize));
+        }
+    }
+
+    /**
+     * This tests areAckedBookiesAdheringToPlacementPolicy function in RackawareEnsemblePlacementPolicy.
+     */
+    @Test
+    public void testAreAckedBookiesAdheringToPlacementPolicy() throws Exception {
+        testAreAckedBookiesAdheringToPlacementPolicyHelper(2, 7, 3, 2, 7, 3, 3);
+        testAreAckedBookiesAdheringToPlacementPolicyHelper(4, 6, 3, 2, 6, 3, 3);
+        testAreAckedBookiesAdheringToPlacementPolicyHelper(5, 7, 5, 3, 7, 5, 2);
+    }
 }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 41798a6..fa77b94 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -891,6 +891,11 @@
 # bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
 # enforceMinNumRacksPerWriteQuorum=false
 
+# Enforce write being acknowledged by bookies belonging to atleast minimum
+# number of fault domains(depending on the placement policy) before being
+# acknowledged by bookkeeper.
+# enforceMinNumFaultDomainsForWrite=false
+
 #############################################################################
 ## Auditor settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 172e0a9..7cccc6d 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -644,6 +644,9 @@
     description: |
       'ignoreLocalNodeInPlacementPolicy' specifies whether to ignore localnode in the internal logic of placement policy. If it is not possible or useful to use Bookkeeper client node's (or AutoReplicator) rack/region info. for placement policy then it is better to ignore localnode instead of false alarming with log lines and metrics.
     default: false
+  - param: enforceMinNumFaultDomainsForWrite
+    description: |
+      'enforceMinNumFaultDomainsForWrite' enforces EnsemblePlacementPolicy to check if a write has made it to bookies in 'minNumRacksPerWriteQuorum' number of fault domains, before acknowledging the write back.
 
 - name: AutoRecovery auditor settings
   params: