Merge pull request #1204 from apache/issue/OAK-10006

Block repository writes if repo lock is lost
diff --git a/oak-parent/pom.xml b/oak-parent/pom.xml
index 490cd08..5b68842 100644
--- a/oak-parent/pom.xml
+++ b/oak-parent/pom.xml
@@ -783,6 +783,12 @@
         <version>1</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>com.github.stefanbirkner</groupId>
+        <artifactId>system-rules</artifactId>
+        <version>1.19.0</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml
index f150efe..02fc807 100644
--- a/oak-segment-azure/pom.xml
+++ b/oak-segment-azure/pom.xml
@@ -110,6 +110,11 @@
             <artifactId>org.osgi.service.metatype.annotations</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.annotation.versioning</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
         <!-- Nullability annotations -->
         <dependency>
@@ -260,6 +265,11 @@
             <artifactId>org.osgi.service.cm</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.stefanbirkner</groupId>
+            <artifactId>system-rules</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
index daa042d..10c5ce4 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -22,6 +22,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.CopyStatus;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
 import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
@@ -61,11 +62,13 @@
     protected final IOMonitor ioMonitor;
 
     protected final FileStoreMonitor monitor;
+    private WriteAccessController writeAccessController;
 
-    public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
-        this.cloudBlobDirectory = cloudBlobDirectory;
+    public AzureArchiveManager(CloudBlobDirectory segmentstoreDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, WriteAccessController writeAccessController) {
+        this.cloudBlobDirectory = segmentstoreDirectory;
         this.ioMonitor = ioMonitor;
         this.monitor = fileStoreMonitor;
+        this.writeAccessController = writeAccessController;
     }
 
     @Override
@@ -127,7 +130,7 @@
 
     @Override
     public SegmentArchiveWriter create(String archiveName) throws IOException {
-        return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
+        return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController);
     }
 
     @Override
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
index 9650334..b468e42 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
@@ -23,6 +23,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.ListBlobItem;
 import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
@@ -53,14 +54,17 @@
 
     private final int lineLimit;
 
-    AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
+    private final WriteAccessController writeAccessController;
+
+    AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController, int lineLimit) {
         this.directory = directory;
         this.journalNamePrefix = journalNamePrefix;
         this.lineLimit = lineLimit;
+        this.writeAccessController = writeAccessController;
     }
 
-    public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
-        this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
+    public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController) {
+        this(directory, journalNamePrefix, writeAccessController, JOURNAL_LINE_LIMIT);
     }
 
     @Override
@@ -183,6 +187,8 @@
         @Override
         public void truncate() throws IOException {
             try {
+                writeAccessController.checkWritingAllowed();
+
                 for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
                     cloudAppendBlob.delete();
                 }
@@ -200,6 +206,8 @@
 
         @Override
         public void batchWriteLines(List<String> lines) throws IOException {
+            writeAccessController.checkWritingAllowed();
+
             if (lines.isEmpty()) {
                 return;
             }
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
index b5434e4..eed0700 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
@@ -34,6 +34,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
@@ -64,6 +65,8 @@
 
     protected final CloudBlobDirectory segmentstoreDirectory;
 
+    protected WriteAccessController writeAccessController = new WriteAccessController();
+
     public AzurePersistence(CloudBlobDirectory segmentStoreDirectory) {
         this.segmentstoreDirectory = segmentStoreDirectory;
 
@@ -92,7 +95,7 @@
     @Override
     public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
         attachRemoteStoreMonitor(remoteStoreMonitor);
-        return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
+        return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController);
     }
 
     @Override
@@ -116,7 +119,7 @@
 
     @Override
     public JournalFile getJournalFile() {
-        return new AzureJournalFile(segmentstoreDirectory, "journal.log");
+        return new AzureJournalFile(segmentstoreDirectory, "journal.log", writeAccessController);
     }
 
     @Override
@@ -134,7 +137,7 @@
         return new AzureRepositoryLock(getBlockBlob("repo.lock"), () -> {
             log.warn("Lost connection to the Azure. The client will be closed.");
             // TODO close the connection
-        }).lock();
+        }, writeAccessController).lock();
     }
 
     private CloudBlockBlob getBlockBlob(String path) throws IOException {
@@ -178,8 +181,11 @@
         });
     }
 
-        public CloudBlobDirectory getSegmentstoreDirectory() {
-            return segmentstoreDirectory;
-        }
+    public CloudBlobDirectory getSegmentstoreDirectory() {
+        return segmentstoreDirectory;
+    }
 
+    public void setWriteAccessController(WriteAccessController writeAccessController) {
+        this.writeAccessController = writeAccessController;
+    }
 }
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
index 60d7d0f..046fa26 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
@@ -17,9 +17,12 @@
 package org.apache.jackrabbit.oak.segment.azure;
 
 import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.RetryNoRetry;
 import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +37,16 @@
     private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);
 
     private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+    private static final Integer LEASE_RENEWAL_TIMEOUT_MS = 5000;
 
-    private static int INTERVAL = 60;
+    public static final String LEASE_DURATION_PROP = "oak.segment.azure.lock.leaseDurationInSec";
+    private final int leaseDuration = Integer.getInteger(LEASE_DURATION_PROP, 60);
+
+    public static final String RENEWAL_INTERVAL_PROP = "oak.segment.azure.lock.leaseRenewalIntervalInSec";
+    private final int renewalInterval = Integer.getInteger(RENEWAL_INTERVAL_PROP, 5);
+
+    public static final String TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP = "oak.segment.azure.lock.blockWritesAfterInSec";
+    private final int timeToWaitBeforeWriteBlock = Integer.getInteger(TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, 20);
 
     private final Runnable shutdownHook;
 
@@ -45,19 +56,27 @@
 
     private final int timeoutSec;
 
+    private WriteAccessController writeAccessController;
+
     private String leaseId;
 
     private volatile boolean doUpdate;
 
-    public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
-        this(blob, shutdownHook, TIMEOUT_SEC);
+    public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController) {
+        this(blob, shutdownHook, writeAccessController, TIMEOUT_SEC);
     }
 
-    public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int timeoutSec) {
+    public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController, int timeoutSec) {
         this.shutdownHook = shutdownHook;
         this.blob = blob;
         this.executor = Executors.newSingleThreadExecutor();
         this.timeoutSec = timeoutSec;
+        this.writeAccessController = writeAccessController;
+
+        if (leaseDuration < timeToWaitBeforeWriteBlock || timeToWaitBeforeWriteBlock < renewalInterval) {
+            throw new IllegalStateException(String.format("The value of %s must be greater than %s and the value of %s must be greater than %s",
+                    LEASE_DURATION_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, RENEWAL_INTERVAL_PROP));
+        }
     }
 
     public AzureRepositoryLock lock() throws IOException {
@@ -66,7 +85,13 @@
         do {
             try {
                 blob.openOutputStream().close();
-                leaseId = blob.acquireLease(INTERVAL, null);
+
+                log.info("{} = {}", LEASE_DURATION_PROP, leaseDuration);
+                log.info("{} = {}", RENEWAL_INTERVAL_PROP, renewalInterval);
+                log.info("{} = {}", TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, timeToWaitBeforeWriteBlock);
+
+                leaseId = blob.acquireLease(leaseDuration, null);
+                writeAccessController.enableWriting();
                 log.info("Acquired lease {}", leaseId);
             } catch (StorageException | IOException e) {
                 if (ex == null) {
@@ -97,14 +122,25 @@
         doUpdate = true;
         long lastUpdate = 0;
         while (doUpdate) {
+            long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
             try {
-                long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
-                if (timeSinceLastUpdate > INTERVAL / 2) {
-                    blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+                if (timeSinceLastUpdate > renewalInterval) {
+
+                    BlobRequestOptions requestOptions = new BlobRequestOptions();
+                    requestOptions.setMaximumExecutionTimeInMs(LEASE_RENEWAL_TIMEOUT_MS);
+                    requestOptions.setRetryPolicyFactory(new RetryNoRetry());
+                    blob.renewLease(AccessCondition.generateLeaseCondition(leaseId), requestOptions, null);
+
+                    writeAccessController.enableWriting();
                     lastUpdate = System.currentTimeMillis();
                 }
             } catch (StorageException e) {
+                timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
+
                 if (e.getErrorCode().equals(StorageErrorCodeStrings.OPERATION_TIMED_OUT)) {
+                    if (timeSinceLastUpdate > timeToWaitBeforeWriteBlock) {
+                        writeAccessController.disableWriting();
+                    }
                     log.warn("Could not renew the lease due to the operation timeout. Retry in progress ...", e);
                 } else {
                     log.error("Can't renew the lease", e);
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
index f7181bb..3df0eaa 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
@@ -31,6 +31,7 @@
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.azure.util.Retrier;
 import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
 import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
@@ -46,9 +47,10 @@
             Integer.getInteger("azure.segment.archive.writer.retries.intervalMs", 5000)
     );
 
-    public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+    public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor, WriteAccessController writeAccessController) {
         super(ioMonitor, monitor);
         this.archiveDirectory = archiveDirectory;
+        this.writeAccessController = writeAccessController;
     }
 
     @Override
@@ -58,6 +60,9 @@
 
     @Override
     protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+
+        writeAccessController.checkWritingAllowed();
+
         long msb = indexEntry.getMsb();
         long lsb = indexEntry.getLsb();
         String segmentName = getSegmentFileName(indexEntry);
@@ -90,6 +95,8 @@
     protected void doWriteDataFile(byte[] data, String extension) throws IOException {
         retrier.execute(() -> {
             try {
+                writeAccessController.checkWritingAllowed();
+
                 getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
             } catch (StorageException e) {
                 throw new IOException(e);
@@ -101,6 +108,8 @@
     protected void afterQueueClosed() throws IOException {
         retrier.execute(() -> {
             try {
+                writeAccessController.checkWritingAllowed();
+
                 getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
             } catch (StorageException e) {
                 throw new IOException(e);
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java
new file mode 100644
index 0000000..30d378d
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("2.0.0")
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
diff --git a/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java b/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
index 97b9afe..0113734 100644
--- a/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
+++ b/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
@@ -58,6 +58,7 @@
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -92,6 +93,8 @@
         SegmentNodeStorePersistence srcPersistence = getSrcPersistence();
         SegmentNodeStorePersistence destPersistence = getDestPersistence();
 
+        RepositoryLock destRepositoryLock = destPersistence.lockRepository();
+
         String srcPathOrUri = getSrcPathOrUri();
         String destPathOrUri = getDestPathOrUri();
 
@@ -111,6 +114,8 @@
         checkJournal(srcPersistence, destPersistence);
         checkGCJournal(srcPersistence, destPersistence);
         checkManifest(srcPersistence, destPersistence);
+
+        destRepositoryLock.unlock();
     }
 
     private int runSegmentCopy(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence,
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
index cfcf133..0f98f9b 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.jackrabbit.oak.segment.azure;
 
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.ListBlobItem;
@@ -34,6 +37,7 @@
 import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
 import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore;
 import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
@@ -49,12 +53,10 @@
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -66,6 +68,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -89,14 +92,26 @@
 
     private CloudBlobContainer container;
 
+    private AzurePersistence azurePersistence;
+
     @Before
     public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
         container = azurite.getContainer("oak-test");
+
+        WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+        azurePersistence.setWriteAccessController(writeAccessController);
     }
 
+    @Rule
+    public final ProvideSystemProperty systemPropertyRule = new ProvideSystemProperty(AzureRepositoryLock.LEASE_DURATION_PROP, "15")
+            .and(AzureRepositoryLock.RENEWAL_INTERVAL_PROP, "3")
+            .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, "9");
+
     @Test
     public void testRecovery() throws StorageException, URISyntaxException, IOException {
-        SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
 
         List<UUID> uuids = new ArrayList<>();
@@ -118,7 +133,7 @@
 
     @Test
     public void testBackupWithRecoveredEntries() throws StorageException, URISyntaxException, IOException {
-        SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
 
         List<UUID> uuids = new ArrayList<>();
@@ -257,7 +272,7 @@
 
     @Test
     public void testExists() throws IOException, URISyntaxException {
-        SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
 
         List<UUID> uuids = new ArrayList<>();
@@ -276,7 +291,7 @@
 
     @Test
     public void testArchiveExistsAfterFlush() throws URISyntaxException, IOException {
-        SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
 
         Assert.assertFalse(manager.exists("data00000a.tar"));
@@ -287,10 +302,7 @@
     }
 
     @Test(expected = FileNotFoundException.class)
-    public void testSegmentDeletedAfterCreatingReader() throws IOException, URISyntaxException, StorageException, InvalidFileStoreVersionException {
-
-        AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
-
+    public void testSegmentDeletedAfterCreatingReader() throws IOException, URISyntaxException, StorageException {
         SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
 
@@ -465,6 +477,80 @@
         }
     }
 
+    @Test
+    public void testWriteAfterLosingRepoLock() throws Exception {
+        CloudBlobDirectory oakDirectory = container.getDirectoryReference("oak");
+        AzurePersistence rwPersistence = new AzurePersistence(oakDirectory);
+
+        CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+
+        CloudBlockBlob blobMocked = Mockito.spy(blob);
+
+        Mockito
+                .doCallRealMethod()
+                .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+        AzurePersistence mockedRwPersistence = Mockito.spy(rwPersistence);
+        WriteAccessController writeAccessController = new WriteAccessController();
+        AzureRepositoryLock azureRepositoryLock = new AzureRepositoryLock(blobMocked, () -> {}, writeAccessController);
+        AzureArchiveManager azureArchiveManager = new AzureArchiveManager(oakDirectory, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), writeAccessController);
+
+
+        Mockito
+                .doAnswer(invocation -> azureRepositoryLock.lock())
+                .when(mockedRwPersistence).lockRepository();
+
+        Mockito
+                .doReturn(azureArchiveManager)
+                .when(mockedRwPersistence).createArchiveManager(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+        Mockito
+                .doReturn(new AzureJournalFile(oakDirectory, "journal.log", writeAccessController))
+                .when(mockedRwPersistence).getJournalFile();
+
+        FileStore rwFileStore = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(mockedRwPersistence).build();
+        SegmentNodeStore segmentNodeStore = SegmentNodeStoreBuilders.builder(rwFileStore).build();
+        NodeBuilder builder = segmentNodeStore.getRoot().builder();
+
+
+        // simulate operation timeout when trying to renew lease
+        Mockito.reset(blobMocked);
+
+        StorageException storageException =
+                new StorageException(StorageErrorCodeStrings.OPERATION_TIMED_OUT, "operation timeout", new TimeoutException());
+
+        Mockito.doThrow(storageException).when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+
+        // wait till lease expires
+        Thread.sleep(17000);
+
+        // try updating repository
+        Thread thread = new Thread(() -> {
+            try {
+                builder.setProperty("foo", "bar");
+                segmentNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+                rwFileStore.flush();
+            } catch (Exception e) {
+                fail("No Exception expected, but got: " + e.getMessage());
+            }
+        });
+        thread.start();
+
+        Thread.sleep(2000);
+
+        // It should be possible to start another RW file store.
+        FileStore rwFileStore2 = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(new AzurePersistence(oakDirectory)).build();
+        SegmentNodeStore segmentNodeStore2 = SegmentNodeStoreBuilders.builder(rwFileStore2).build();
+        NodeBuilder builder2 = segmentNodeStore2.getRoot().builder();
+
+        //repository hasn't been updated
+        assertNull(builder2.getProperty("foo"));
+
+        rwFileStore2.close();
+
+        Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+    }
+
     private PersistentCache createPersistenceCache() {
         return new AbstractPersistentCache() {
             @Override
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
index 1443325..f5da9f1 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
@@ -23,6 +23,7 @@
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
 import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
 import org.jetbrains.annotations.NotNull;
@@ -54,7 +55,9 @@
     @Before
     public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
         container = azurite.getContainer("oak-test");
-        journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 50);
+        WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", writeAccessController, 50);
     }
 
     @Test
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
index f98e2da..63a2da2 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
@@ -94,7 +94,7 @@
         @Override
         public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
                 FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
-            return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor) {
+            return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController) {
                 @Override
                 public SegmentArchiveReader open(String archiveName) throws IOException {
                     CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
@@ -110,7 +110,7 @@
                 @Override
                 public SegmentArchiveWriter create(String archiveName) throws IOException {
                     CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
-                    return new AzureSegmentArchiveWriter(archiveDirectory, ioMonitor, fileStoreMonitor) {
+                    return new AzureSegmentArchiveWriter(archiveDirectory, ioMonitor, fileStoreMonitor, writeAccessController) {
                         @Override
                         public Buffer readSegment(long msb, long lsb) throws IOException {
                             throw new RepositoryNotReachableException(
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
index 4a5676b..03c878c 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
@@ -24,27 +24,31 @@
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 
 import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.rmi.server.ExportException;
 import java.security.InvalidKeyException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class AzureRepositoryLockTest {
 
     private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLockTest.class);
+    public static final String LEASE_DURATION = "15";
+    public static final String RENEWAL_INTERVAL = "3";
+    public static final String TIME_TO_WAIT_BEFORE_BLOCK = "9";
 
     @ClassRule
     public static AzuriteDockerRule azurite = new AzuriteDockerRule();
@@ -56,12 +60,17 @@
         container = azurite.getContainer("oak-test");
     }
 
+    @Rule
+    public final ProvideSystemProperty systemPropertyRule = new ProvideSystemProperty(AzureRepositoryLock.LEASE_DURATION_PROP, LEASE_DURATION)
+            .and(AzureRepositoryLock.RENEWAL_INTERVAL_PROP, RENEWAL_INTERVAL)
+            .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, TIME_TO_WAIT_BEFORE_BLOCK);
+
     @Test
     public void testFailingLock() throws URISyntaxException, IOException, StorageException {
         CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
-        new AzureRepositoryLock(blob, () -> {}, 0).lock();
+        new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
         try {
-            new AzureRepositoryLock(blob, () -> {}, 0).lock();
+            new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
             fail("The second lock should fail.");
         } catch (IOException e) {
             // it's fine
@@ -74,7 +83,7 @@
         Semaphore s = new Semaphore(0);
         new Thread(() -> {
             try {
-                RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, 0).lock();
+                RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
                 s.release();
                 Thread.sleep(1000);
                 lock.unlock();
@@ -84,7 +93,7 @@
         }).start();
 
         s.acquire();
-        new AzureRepositoryLock(blob, () -> {}, 10).lock();
+        new AzureRepositoryLock(blob, () -> {}, new WriteAccessController(), 10).lock();
     }
 
     @Test
@@ -99,21 +108,64 @@
         Mockito.doThrow(storageException)
                 .doThrow(storageException)
                 .doCallRealMethod()
-                .when(blobMocked).renewLease(Mockito.any());
+                .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
 
-        new AzureRepositoryLock(blobMocked, () -> {}, 0).lock();
+        new AzureRepositoryLock(blobMocked, () -> {}, new WriteAccessController()).lock();
 
         // wait till lease expires
-        Thread.sleep(70000);
+        Thread.sleep(16000);
 
         // reset the mock to default behaviour
-        Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any());
+        Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
 
         try {
-            new AzureRepositoryLock(blobMocked, () -> {}, 0).lock();
+            new AzureRepositoryLock(blobMocked, () -> {}, new WriteAccessController()).lock();
             fail("The second lock should fail.");
         } catch (IOException e) {
             // it's fine
         }
     }
+
+    @Test
+    public void testWritesBlockedOnlyAfterFewUnsuccessfulAttempts() throws Exception {
+
+        CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+
+        CloudBlockBlob blobMocked = Mockito.spy(blob);
+
+        // instrument the mock to throw the exception twice when renewing the lease
+        StorageException storageException =
+                new StorageException(StorageErrorCodeStrings.OPERATION_TIMED_OUT, "operation timeout", new TimeoutException());
+        Mockito
+                .doCallRealMethod()
+                .doThrow(storageException)
+                .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+
+        WriteAccessController writeAccessController = new WriteAccessController();
+
+        new AzureRepositoryLock(blobMocked, () -> {}, writeAccessController).lock();
+
+
+        Thread thread = new Thread(() -> {
+
+            while (true) {
+                writeAccessController.checkWritingAllowed();
+
+            }
+        });
+
+        thread.start();
+
+        Thread.sleep(3000);
+        assertFalse("after 3 seconds thread should not be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+        Thread.sleep(3000);
+        assertFalse("after 6 seconds thread should not be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+        Thread.sleep(5000);
+        assertTrue("after more than 9 seconds thread should be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+        Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+    }
 }
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
index 4b02c00..ff45bf6 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
@@ -20,6 +20,7 @@
 
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
@@ -167,7 +168,11 @@
 
     @NotNull
     private SegmentArchiveWriter createSegmentArchiveWriter() throws URISyntaxException, IOException {
-        SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));/**/
+        azurePersistence.setWriteAccessController(writeAccessController);
+        SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         SegmentArchiveWriter writer = manager.create("data00000a.tar");
         return writer;
     }
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
index b26cb7d..009ec43 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
@@ -20,6 +20,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
 import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest;
@@ -45,7 +46,11 @@
     public void setUp() throws IOException {
         try {
             container = azurite.getContainer("oak-test");
-            archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+            AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+            WriteAccessController writeAccessController = new WriteAccessController();
+            writeAccessController.enableWriting();
+            azurePersistence.setWriteAccessController(writeAccessController);
+            archiveManager = azurePersistence.createArchiveManager(true, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
         } catch (StorageException | InvalidKeyException | URISyntaxException e) {
             throw new IOException(e);
         }
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
index 065f4f7..3be1367 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
@@ -19,6 +19,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
 import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
@@ -38,6 +39,10 @@
     @Override
     public void setUp() throws Exception {
         container = azurite.getContainer("oak-test");
+        AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+        WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        azurePersistence.setWriteAccessController(writeAccessController);
         tarFiles = TarFiles.builder()
                 .withDirectory(folder.newFolder())
                 .withTarRecovery((id, data, recovery) -> {
@@ -47,7 +52,7 @@
                 .withFileStoreMonitor(new FileStoreMonitorAdapter())
                 .withRemoteStoreMonitor(new RemoteStoreMonitorAdapter())
                 .withMaxFileSize(MAX_FILE_SIZE)
-                .withPersistence(new AzurePersistence(container.getDirectoryReference("oak")))
+                .withPersistence(azurePersistence)
                 .build();
     }
 }
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
index 1ca8b20..c27e3a7 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
@@ -19,6 +19,7 @@
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
 import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
 import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest;
 import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
@@ -44,16 +45,21 @@
     @NotNull
     @Override
     protected SegmentArchiveManager getSegmentArchiveManager() throws Exception {
-        return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor);
+        WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        AzureArchiveManager azureArchiveManager = new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor, writeAccessController);
+        return azureArchiveManager;
     }
 
     @NotNull
     @Override
     protected SegmentArchiveManager getFailingSegmentArchiveManager() throws Exception {
-        return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor) {
+        final WriteAccessController writeAccessController = new WriteAccessController();
+        writeAccessController.enableWriting();
+        return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor, writeAccessController) {
             @Override
             public SegmentArchiveWriter create(String archiveName) throws IOException {
-                return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor) {
+                return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController) {
                     @Override
                     public void writeGraph(@NotNull byte[] data) throws IOException {
                         throw new IOException("test");
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
index 544d168..dcd5915 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
@@ -24,6 +24,7 @@
 import org.apache.jackrabbit.oak.segment.file.JournalReader;
 import org.apache.jackrabbit.oak.segment.file.JournalReaderTest;
 import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
 import org.junit.Before;
 import org.junit.ClassRule;
 
@@ -48,7 +49,7 @@
             CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001");
             blob.createOrReplace();
             blob.appendText(s);
-            return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log"));
+            return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", new WriteAccessController()));
         } catch (StorageException | URISyntaxException e) {
             throw new IOException(e);
         }
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
index a681a49..a0b158e 100644
--- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
@@ -46,6 +46,8 @@
 
     protected volatile boolean created = false;
 
+    protected WriteAccessController writeAccessController = null;
+
     public AbstractRemoteSegmentArchiveWriter(IOMonitor ioMonitor, FileStoreMonitor monitor) {
         this.ioMonitor = ioMonitor;
         this.monitor = monitor;
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java
new file mode 100644
index 0000000..deb8f7e
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+public class WriteAccessController {
+    private volatile boolean isWritingAllowed = false;
+
+    private Object lock = new Object();
+
+    public void disableWriting() {
+        this.isWritingAllowed = false;
+    }
+
+    public void enableWriting() {
+        this.isWritingAllowed = true;
+
+        synchronized (lock) {
+            lock.notifyAll();
+        }
+    }
+
+    public void checkWritingAllowed() {
+        while (!isWritingAllowed) {
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted while waiting for writing to be allowed", e);
+                }
+            }
+        }
+    }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
index ce6b89b..2e20db2 100644
--- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 @Internal(since = "1.0.0")
-@Version("1.1.0")
+@Version("1.2.0")
 package org.apache.jackrabbit.oak.segment.remote;
 
 import org.apache.jackrabbit.oak.commons.annotations.Internal;
diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java
new file mode 100644
index 0000000..4302d81
--- /dev/null
+++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class WriteAccessControllerTest {
+
+    @Test
+    public void testThreadBlocking() throws InterruptedException {
+        WriteAccessController controller = new WriteAccessController();
+
+        Thread t1 = new Thread(() -> {
+            controller.checkWritingAllowed();
+        });
+        Thread t2 = new Thread(() -> {
+            controller.checkWritingAllowed();
+        });
+
+        controller.disableWriting();
+
+        t1.start();
+        t2.start();
+
+        Thread.sleep(200);
+
+        assertThreadWaiting(t1.getState());
+        assertThreadWaiting(t2.getState());
+
+        controller.enableWriting();
+
+        Thread.sleep(200);
+
+        assertFalse(t1.isAlive());
+        assertFalse(t2.isAlive());
+    }
+
+    private void assertThreadWaiting(Thread.State state) {
+        assert state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+    }
+}