Merge pull request #1204 from apache/issue/OAK-10006
Block repository writes if repo lock is lost
diff --git a/oak-parent/pom.xml b/oak-parent/pom.xml
index 490cd08..5b68842 100644
--- a/oak-parent/pom.xml
+++ b/oak-parent/pom.xml
@@ -783,6 +783,12 @@
<version>1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>system-rules</artifactId>
+ <version>1.19.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml
index f150efe..02fc807 100644
--- a/oak-segment-azure/pom.xml
+++ b/oak-segment-azure/pom.xml
@@ -110,6 +110,11 @@
<artifactId>org.osgi.service.metatype.annotations</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation.versioning</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- Nullability annotations -->
<dependency>
@@ -260,6 +265,11 @@
<artifactId>org.osgi.service.cm</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>system-rules</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
index daa042d..10c5ce4 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -22,6 +22,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.remote.RemoteUtilities;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
@@ -61,11 +62,13 @@
protected final IOMonitor ioMonitor;
protected final FileStoreMonitor monitor;
+ private WriteAccessController writeAccessController;
- public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
- this.cloudBlobDirectory = cloudBlobDirectory;
+ public AzureArchiveManager(CloudBlobDirectory segmentstoreDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, WriteAccessController writeAccessController) {
+ this.cloudBlobDirectory = segmentstoreDirectory;
this.ioMonitor = ioMonitor;
this.monitor = fileStoreMonitor;
+ this.writeAccessController = writeAccessController;
}
@Override
@@ -127,7 +130,7 @@
@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
- return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
+ return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController);
}
@Override
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
index 9650334..b468e42 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
@@ -23,6 +23,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.jackrabbit.oak.segment.azure.util.CaseInsensitiveKeysMapAccess;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
@@ -53,14 +54,17 @@
private final int lineLimit;
- AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
+ private final WriteAccessController writeAccessController;
+
+ AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController, int lineLimit) {
this.directory = directory;
this.journalNamePrefix = journalNamePrefix;
this.lineLimit = lineLimit;
+ this.writeAccessController = writeAccessController;
}
- public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
- this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
+ public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, WriteAccessController writeAccessController) {
+ this(directory, journalNamePrefix, writeAccessController, JOURNAL_LINE_LIMIT);
}
@Override
@@ -183,6 +187,8 @@
@Override
public void truncate() throws IOException {
try {
+ writeAccessController.checkWritingAllowed();
+
for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
cloudAppendBlob.delete();
}
@@ -200,6 +206,8 @@
@Override
public void batchWriteLines(List<String> lines) throws IOException {
+ writeAccessController.checkWritingAllowed();
+
if (lines.isEmpty()) {
return;
}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
index b5434e4..eed0700 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
@@ -34,6 +34,7 @@
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor;
@@ -64,6 +65,8 @@
protected final CloudBlobDirectory segmentstoreDirectory;
+ protected WriteAccessController writeAccessController = new WriteAccessController();
+
public AzurePersistence(CloudBlobDirectory segmentStoreDirectory) {
this.segmentstoreDirectory = segmentStoreDirectory;
@@ -92,7 +95,7 @@
@Override
public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
attachRemoteStoreMonitor(remoteStoreMonitor);
- return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
+ return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController);
}
@Override
@@ -116,7 +119,7 @@
@Override
public JournalFile getJournalFile() {
- return new AzureJournalFile(segmentstoreDirectory, "journal.log");
+ return new AzureJournalFile(segmentstoreDirectory, "journal.log", writeAccessController);
}
@Override
@@ -134,7 +137,7 @@
return new AzureRepositoryLock(getBlockBlob("repo.lock"), () -> {
log.warn("Lost connection to the Azure. The client will be closed.");
// TODO close the connection
- }).lock();
+ }, writeAccessController).lock();
}
private CloudBlockBlob getBlockBlob(String path) throws IOException {
@@ -178,8 +181,11 @@
});
}
- public CloudBlobDirectory getSegmentstoreDirectory() {
- return segmentstoreDirectory;
- }
+ public CloudBlobDirectory getSegmentstoreDirectory() {
+ return segmentstoreDirectory;
+ }
+ public void setWriteAccessController(WriteAccessController writeAccessController) {
+ this.writeAccessController = writeAccessController;
+ }
}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
index 60d7d0f..046fa26 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
@@ -17,9 +17,12 @@
package org.apache.jackrabbit.oak.segment.azure;
import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.RetryNoRetry;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +37,16 @@
private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);
private static final int TIMEOUT_SEC = Integer.getInteger("oak.segment.azure.lock.timeout", 0);
+ private static final Integer LEASE_RENEWAL_TIMEOUT_MS = 5000;
- private static int INTERVAL = 60;
+ public static final String LEASE_DURATION_PROP = "oak.segment.azure.lock.leaseDurationInSec";
+ private final int leaseDuration = Integer.getInteger(LEASE_DURATION_PROP, 60);
+
+ public static final String RENEWAL_INTERVAL_PROP = "oak.segment.azure.lock.leaseRenewalIntervalInSec";
+ private final int renewalInterval = Integer.getInteger(RENEWAL_INTERVAL_PROP, 5);
+
+ public static final String TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP = "oak.segment.azure.lock.blockWritesAfterInSec";
+ private final int timeToWaitBeforeWriteBlock = Integer.getInteger(TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, 20);
private final Runnable shutdownHook;
@@ -45,19 +56,27 @@
private final int timeoutSec;
+ private WriteAccessController writeAccessController;
+
private String leaseId;
private volatile boolean doUpdate;
- public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
- this(blob, shutdownHook, TIMEOUT_SEC);
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController) {
+ this(blob, shutdownHook, writeAccessController, TIMEOUT_SEC);
}
- public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, int timeoutSec) {
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook, WriteAccessController writeAccessController, int timeoutSec) {
this.shutdownHook = shutdownHook;
this.blob = blob;
this.executor = Executors.newSingleThreadExecutor();
this.timeoutSec = timeoutSec;
+ this.writeAccessController = writeAccessController;
+
+ if (leaseDuration < timeToWaitBeforeWriteBlock || timeToWaitBeforeWriteBlock < renewalInterval) {
+ throw new IllegalStateException(String.format("The value of %s must be greater than %s and the value of %s must be greater than %s",
+ LEASE_DURATION_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, RENEWAL_INTERVAL_PROP));
+ }
}
public AzureRepositoryLock lock() throws IOException {
@@ -66,7 +85,13 @@
do {
try {
blob.openOutputStream().close();
- leaseId = blob.acquireLease(INTERVAL, null);
+
+ log.info("{} = {}", LEASE_DURATION_PROP, leaseDuration);
+ log.info("{} = {}", RENEWAL_INTERVAL_PROP, renewalInterval);
+ log.info("{} = {}", TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, timeToWaitBeforeWriteBlock);
+
+ leaseId = blob.acquireLease(leaseDuration, null);
+ writeAccessController.enableWriting();
log.info("Acquired lease {}", leaseId);
} catch (StorageException | IOException e) {
if (ex == null) {
@@ -97,14 +122,25 @@
doUpdate = true;
long lastUpdate = 0;
while (doUpdate) {
+ long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
try {
- long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
- if (timeSinceLastUpdate > INTERVAL / 2) {
- blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ if (timeSinceLastUpdate > renewalInterval) {
+
+ BlobRequestOptions requestOptions = new BlobRequestOptions();
+ requestOptions.setMaximumExecutionTimeInMs(LEASE_RENEWAL_TIMEOUT_MS);
+ requestOptions.setRetryPolicyFactory(new RetryNoRetry());
+ blob.renewLease(AccessCondition.generateLeaseCondition(leaseId), requestOptions, null);
+
+ writeAccessController.enableWriting();
lastUpdate = System.currentTimeMillis();
}
} catch (StorageException e) {
+ timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
+
if (e.getErrorCode().equals(StorageErrorCodeStrings.OPERATION_TIMED_OUT)) {
+ if (timeSinceLastUpdate > timeToWaitBeforeWriteBlock) {
+ writeAccessController.disableWriting();
+ }
log.warn("Could not renew the lease due to the operation timeout. Retry in progress ...", e);
} else {
log.error("Can't renew the lease", e);
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
index f7181bb..3df0eaa 100644
--- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
@@ -31,6 +31,7 @@
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.azure.util.Retrier;
import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveWriter;
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
@@ -46,9 +47,10 @@
Integer.getInteger("azure.segment.archive.writer.retries.intervalMs", 5000)
);
- public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor, WriteAccessController writeAccessController) {
super(ioMonitor, monitor);
this.archiveDirectory = archiveDirectory;
+ this.writeAccessController = writeAccessController;
}
@Override
@@ -58,6 +60,9 @@
@Override
protected void doWriteArchiveEntry(RemoteSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+
+ writeAccessController.checkWritingAllowed();
+
long msb = indexEntry.getMsb();
long lsb = indexEntry.getLsb();
String segmentName = getSegmentFileName(indexEntry);
@@ -90,6 +95,8 @@
protected void doWriteDataFile(byte[] data, String extension) throws IOException {
retrier.execute(() -> {
try {
+ writeAccessController.checkWritingAllowed();
+
getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
} catch (StorageException e) {
throw new IOException(e);
@@ -101,6 +108,8 @@
protected void afterQueueClosed() throws IOException {
retrier.execute(() -> {
try {
+ writeAccessController.checkWritingAllowed();
+
getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
} catch (StorageException e) {
throw new IOException(e);
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java
new file mode 100644
index 0000000..30d378d
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("2.0.0")
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
diff --git a/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java b/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
index 97b9afe..0113734 100644
--- a/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
+++ b/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
@@ -58,6 +58,7 @@
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -92,6 +93,8 @@
SegmentNodeStorePersistence srcPersistence = getSrcPersistence();
SegmentNodeStorePersistence destPersistence = getDestPersistence();
+ RepositoryLock destRepositoryLock = destPersistence.lockRepository();
+
String srcPathOrUri = getSrcPathOrUri();
String destPathOrUri = getDestPathOrUri();
@@ -111,6 +114,8 @@
checkJournal(srcPersistence, destPersistence);
checkGCJournal(srcPersistence, destPersistence);
checkManifest(srcPersistence, destPersistence);
+
+ destRepositoryLock.unlock();
}
private int runSegmentCopy(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence,
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
index cfcf133..0f98f9b 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.jackrabbit.oak.segment.azure;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.ListBlobItem;
@@ -34,6 +37,7 @@
import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore;
import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
@@ -49,12 +53,10 @@
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import java.io.File;
import java.io.FileNotFoundException;
@@ -66,6 +68,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import static org.apache.jackrabbit.guava.common.collect.Lists.newArrayList;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -89,14 +92,26 @@
private CloudBlobContainer container;
+ private AzurePersistence azurePersistence;
+
@Before
public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
container = azurite.getContainer("oak-test");
+
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+ azurePersistence.setWriteAccessController(writeAccessController);
}
+ @Rule
+ public final ProvideSystemProperty systemPropertyRule = new ProvideSystemProperty(AzureRepositoryLock.LEASE_DURATION_PROP, "15")
+ .and(AzureRepositoryLock.RENEWAL_INTERVAL_PROP, "3")
+ .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, "9");
+
@Test
public void testRecovery() throws StorageException, URISyntaxException, IOException {
- SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
List<UUID> uuids = new ArrayList<>();
@@ -118,7 +133,7 @@
@Test
public void testBackupWithRecoveredEntries() throws StorageException, URISyntaxException, IOException {
- SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
List<UUID> uuids = new ArrayList<>();
@@ -257,7 +272,7 @@
@Test
public void testExists() throws IOException, URISyntaxException {
- SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
List<UUID> uuids = new ArrayList<>();
@@ -276,7 +291,7 @@
@Test
public void testArchiveExistsAfterFlush() throws URISyntaxException, IOException {
- SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
Assert.assertFalse(manager.exists("data00000a.tar"));
@@ -287,10 +302,7 @@
}
@Test(expected = FileNotFoundException.class)
- public void testSegmentDeletedAfterCreatingReader() throws IOException, URISyntaxException, StorageException, InvalidFileStoreVersionException {
-
- AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
-
+ public void testSegmentDeletedAfterCreatingReader() throws IOException, URISyntaxException, StorageException {
SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
@@ -465,6 +477,80 @@
}
}
+ @Test
+ public void testWriteAfterLosingRepoLock() throws Exception {
+ CloudBlobDirectory oakDirectory = container.getDirectoryReference("oak");
+ AzurePersistence rwPersistence = new AzurePersistence(oakDirectory);
+
+ CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+
+ CloudBlockBlob blobMocked = Mockito.spy(blob);
+
+ Mockito
+ .doCallRealMethod()
+ .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+ AzurePersistence mockedRwPersistence = Mockito.spy(rwPersistence);
+ WriteAccessController writeAccessController = new WriteAccessController();
+ AzureRepositoryLock azureRepositoryLock = new AzureRepositoryLock(blobMocked, () -> {}, writeAccessController);
+ AzureArchiveManager azureArchiveManager = new AzureArchiveManager(oakDirectory, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), writeAccessController);
+
+
+ Mockito
+ .doAnswer(invocation -> azureRepositoryLock.lock())
+ .when(mockedRwPersistence).lockRepository();
+
+ Mockito
+ .doReturn(azureArchiveManager)
+ .when(mockedRwPersistence).createArchiveManager(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito
+ .doReturn(new AzureJournalFile(oakDirectory, "journal.log", writeAccessController))
+ .when(mockedRwPersistence).getJournalFile();
+
+ FileStore rwFileStore = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(mockedRwPersistence).build();
+ SegmentNodeStore segmentNodeStore = SegmentNodeStoreBuilders.builder(rwFileStore).build();
+ NodeBuilder builder = segmentNodeStore.getRoot().builder();
+
+
+ // simulate operation timeout when trying to renew lease
+ Mockito.reset(blobMocked);
+
+ StorageException storageException =
+ new StorageException(StorageErrorCodeStrings.OPERATION_TIMED_OUT, "operation timeout", new TimeoutException());
+
+ Mockito.doThrow(storageException).when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+
+ // wait till lease expires
+ Thread.sleep(17000);
+
+ // try updating repository
+ Thread thread = new Thread(() -> {
+ try {
+ builder.setProperty("foo", "bar");
+ segmentNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ rwFileStore.flush();
+ } catch (Exception e) {
+ fail("No Exception expected, but got: " + e.getMessage());
+ }
+ });
+ thread.start();
+
+ Thread.sleep(2000);
+
+ // It should be possible to start another RW file store.
+ FileStore rwFileStore2 = FileStoreBuilder.fileStoreBuilder(folder.newFolder()).withCustomPersistence(new AzurePersistence(oakDirectory)).build();
+ SegmentNodeStore segmentNodeStore2 = SegmentNodeStoreBuilders.builder(rwFileStore2).build();
+ NodeBuilder builder2 = segmentNodeStore2.getRoot().builder();
+
+ //repository hasn't been updated
+ assertNull(builder2.getProperty("foo"));
+
+ rwFileStore2.close();
+
+ Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+ }
+
private PersistentCache createPersistenceCache() {
return new AbstractPersistentCache() {
@Override
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
index 1443325..f5da9f1 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
@@ -23,6 +23,7 @@
import java.util.stream.IntStream;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
import org.jetbrains.annotations.NotNull;
@@ -54,7 +55,9 @@
@Before
public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
container = azurite.getContainer("oak-test");
- journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 50);
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", writeAccessController, 50);
}
@Test
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
index f98e2da..63a2da2 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureReadSegmentTest.java
@@ -94,7 +94,7 @@
@Override
public SegmentArchiveManager createArchiveManager(boolean mmap, boolean offHeapAccess, IOMonitor ioMonitor,
FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) {
- return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor) {
+ return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor, writeAccessController) {
@Override
public SegmentArchiveReader open(String archiveName) throws IOException {
CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
@@ -110,7 +110,7 @@
@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
- return new AzureSegmentArchiveWriter(archiveDirectory, ioMonitor, fileStoreMonitor) {
+ return new AzureSegmentArchiveWriter(archiveDirectory, ioMonitor, fileStoreMonitor, writeAccessController) {
@Override
public Buffer readSegment(long msb, long lsb) throws IOException {
throw new RepositoryNotReachableException(
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
index 4a5676b..03c878c 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLockTest.java
@@ -24,27 +24,31 @@
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.rmi.server.ExportException;
import java.security.InvalidKeyException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class AzureRepositoryLockTest {
private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLockTest.class);
+ public static final String LEASE_DURATION = "15";
+ public static final String RENEWAL_INTERVAL = "3";
+ public static final String TIME_TO_WAIT_BEFORE_BLOCK = "9";
@ClassRule
public static AzuriteDockerRule azurite = new AzuriteDockerRule();
@@ -56,12 +60,17 @@
container = azurite.getContainer("oak-test");
}
+ @Rule
+ public final ProvideSystemProperty systemPropertyRule = new ProvideSystemProperty(AzureRepositoryLock.LEASE_DURATION_PROP, LEASE_DURATION)
+ .and(AzureRepositoryLock.RENEWAL_INTERVAL_PROP, RENEWAL_INTERVAL)
+ .and(AzureRepositoryLock.TIME_TO_WAIT_BEFORE_WRITE_BLOCK_PROP, TIME_TO_WAIT_BEFORE_BLOCK);
+
@Test
public void testFailingLock() throws URISyntaxException, IOException, StorageException {
CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
- new AzureRepositoryLock(blob, () -> {}, 0).lock();
+ new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
try {
- new AzureRepositoryLock(blob, () -> {}, 0).lock();
+ new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
fail("The second lock should fail.");
} catch (IOException e) {
// it's fine
@@ -74,7 +83,7 @@
Semaphore s = new Semaphore(0);
new Thread(() -> {
try {
- RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, 0).lock();
+ RepositoryLock lock = new AzureRepositoryLock(blob, () -> {}, new WriteAccessController()).lock();
s.release();
Thread.sleep(1000);
lock.unlock();
@@ -84,7 +93,7 @@
}).start();
s.acquire();
- new AzureRepositoryLock(blob, () -> {}, 10).lock();
+ new AzureRepositoryLock(blob, () -> {}, new WriteAccessController(), 10).lock();
}
@Test
@@ -99,21 +108,64 @@
Mockito.doThrow(storageException)
.doThrow(storageException)
.doCallRealMethod()
- .when(blobMocked).renewLease(Mockito.any());
+ .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
- new AzureRepositoryLock(blobMocked, () -> {}, 0).lock();
+ new AzureRepositoryLock(blobMocked, () -> {}, new WriteAccessController()).lock();
// wait till lease expires
- Thread.sleep(70000);
+ Thread.sleep(16000);
// reset the mock to default behaviour
- Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any());
+ Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
try {
- new AzureRepositoryLock(blobMocked, () -> {}, 0).lock();
+ new AzureRepositoryLock(blobMocked, () -> {}, new WriteAccessController()).lock();
fail("The second lock should fail.");
} catch (IOException e) {
// it's fine
}
}
+
+ @Test
+ public void testWritesBlockedOnlyAfterFewUnsuccessfulAttempts() throws Exception {
+
+ CloudBlockBlob blob = container.getBlockBlobReference("oak/repo.lock");
+
+ CloudBlockBlob blobMocked = Mockito.spy(blob);
+
+ // instrument the mock to throw the exception twice when renewing the lease
+ StorageException storageException =
+ new StorageException(StorageErrorCodeStrings.OPERATION_TIMED_OUT, "operation timeout", new TimeoutException());
+ Mockito
+ .doCallRealMethod()
+ .doThrow(storageException)
+ .when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+
+
+ WriteAccessController writeAccessController = new WriteAccessController();
+
+ new AzureRepositoryLock(blobMocked, () -> {}, writeAccessController).lock();
+
+
+ Thread thread = new Thread(() -> {
+
+ while (true) {
+ writeAccessController.checkWritingAllowed();
+
+ }
+ });
+
+ thread.start();
+
+ Thread.sleep(3000);
+ assertFalse("after 3 seconds thread should not be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+ Thread.sleep(3000);
+ assertFalse("after 6 seconds thread should not be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+ Thread.sleep(5000);
+ assertTrue("after more than 9 seconds thread should be in a waiting state", thread.getState().equals(Thread.State.WAITING));
+
+ Mockito.doCallRealMethod().when(blobMocked).renewLease(Mockito.any(), Mockito.any(), Mockito.any());
+ }
}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
index 4b02c00..ff45bf6 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriterTest.java
@@ -20,6 +20,7 @@
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter;
@@ -167,7 +168,11 @@
@NotNull
private SegmentArchiveWriter createSegmentArchiveWriter() throws URISyntaxException, IOException {
- SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));/**/
+ azurePersistence.setWriteAccessController(writeAccessController);
+ SegmentArchiveManager manager = azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
SegmentArchiveWriter writer = manager.create("data00000a.tar");
return writer;
}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
index b26cb7d..009ec43 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
@@ -20,6 +20,7 @@
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest;
@@ -45,7 +46,11 @@
public void setUp() throws IOException {
try {
container = azurite.getContainer("oak-test");
- archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ azurePersistence.setWriteAccessController(writeAccessController);
+ archiveManager = azurePersistence.createArchiveManager(true, false, new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
} catch (StorageException | InvalidKeyException | URISyntaxException e) {
throw new IOException(e);
}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
index 065f4f7..3be1367 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
@@ -19,6 +19,7 @@
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
@@ -38,6 +39,10 @@
@Override
public void setUp() throws Exception {
container = azurite.getContainer("oak-test");
+ AzurePersistence azurePersistence = new AzurePersistence(container.getDirectoryReference("oak"));
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ azurePersistence.setWriteAccessController(writeAccessController);
tarFiles = TarFiles.builder()
.withDirectory(folder.newFolder())
.withTarRecovery((id, data, recovery) -> {
@@ -47,7 +52,7 @@
.withFileStoreMonitor(new FileStoreMonitorAdapter())
.withRemoteStoreMonitor(new RemoteStoreMonitorAdapter())
.withMaxFileSize(MAX_FILE_SIZE)
- .withPersistence(new AzurePersistence(container.getDirectoryReference("oak")))
+ .withPersistence(azurePersistence)
.build();
}
}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
index 1ca8b20..c27e3a7 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
@@ -19,6 +19,7 @@
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest;
import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
@@ -44,16 +45,21 @@
@NotNull
@Override
protected SegmentArchiveManager getSegmentArchiveManager() throws Exception {
- return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor);
+ WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ AzureArchiveManager azureArchiveManager = new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor, writeAccessController);
+ return azureArchiveManager;
}
@NotNull
@Override
protected SegmentArchiveManager getFailingSegmentArchiveManager() throws Exception {
- return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor) {
+ final WriteAccessController writeAccessController = new WriteAccessController();
+ writeAccessController.enableWriting();
+ return new AzureArchiveManager(container.getDirectoryReference("oak"), new IOMonitorAdapter(), monitor, writeAccessController) {
@Override
public SegmentArchiveWriter create(String archiveName) throws IOException {
- return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor) {
+ return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor, writeAccessController) {
@Override
public void writeGraph(@NotNull byte[] data) throws IOException {
throw new IOException("test");
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
index 544d168..dcd5915 100644
--- a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
@@ -24,6 +24,7 @@
import org.apache.jackrabbit.oak.segment.file.JournalReader;
import org.apache.jackrabbit.oak.segment.file.JournalReaderTest;
import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile;
+import org.apache.jackrabbit.oak.segment.remote.WriteAccessController;
import org.junit.Before;
import org.junit.ClassRule;
@@ -48,7 +49,7 @@
CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001");
blob.createOrReplace();
blob.appendText(s);
- return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log"));
+ return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", new WriteAccessController()));
} catch (StorageException | URISyntaxException e) {
throw new IOException(e);
}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
index a681a49..a0b158e 100644
--- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveWriter.java
@@ -46,6 +46,8 @@
protected volatile boolean created = false;
+ protected WriteAccessController writeAccessController = null;
+
public AbstractRemoteSegmentArchiveWriter(IOMonitor ioMonitor, FileStoreMonitor monitor) {
this.ioMonitor = ioMonitor;
this.monitor = monitor;
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java
new file mode 100644
index 0000000..deb8f7e
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessController.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+public class WriteAccessController {
+ private volatile boolean isWritingAllowed = false;
+
+ private Object lock = new Object();
+
+ public void disableWriting() {
+ this.isWritingAllowed = false;
+ }
+
+ public void enableWriting() {
+ this.isWritingAllowed = true;
+
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+ public void checkWritingAllowed() {
+ while (!isWritingAllowed) {
+ synchronized (lock) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for writing to be allowed", e);
+ }
+ }
+ }
+ }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
index ce6b89b..2e20db2 100644
--- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/package-info.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
@Internal(since = "1.0.0")
-@Version("1.1.0")
+@Version("1.2.0")
package org.apache.jackrabbit.oak.segment.remote;
import org.apache.jackrabbit.oak.commons.annotations.Internal;
diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java
new file mode 100644
index 0000000..4302d81
--- /dev/null
+++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/WriteAccessControllerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.remote;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+
+public class WriteAccessControllerTest {
+
+ @Test
+ public void testThreadBlocking() throws InterruptedException {
+ WriteAccessController controller = new WriteAccessController();
+
+ Thread t1 = new Thread(() -> {
+ controller.checkWritingAllowed();
+ });
+ Thread t2 = new Thread(() -> {
+ controller.checkWritingAllowed();
+ });
+
+ controller.disableWriting();
+
+ t1.start();
+ t2.start();
+
+ Thread.sleep(200);
+
+ assertThreadWaiting(t1.getState());
+ assertThreadWaiting(t2.getState());
+
+ controller.enableWriting();
+
+ Thread.sleep(200);
+
+ assertFalse(t1.isAlive());
+ assertFalse(t2.isAlive());
+ }
+
+ private void assertThreadWaiting(Thread.State state) {
+ assert state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ }
+}